Skip to main content

pylon_runtime/
datastore.rs

1//! Implements the platform-agnostic [`DataStore`] trait for [`Runtime`].
2//!
3//! This bridges the concrete SQLite-backed Runtime to the abstract trait
4//! used by the router crate, enabling the same routing logic to run on
5//! self-hosted servers and Cloudflare Workers alike.
6
7use pylon_http::{DataError, DataStore};
8
9use crate::Runtime;
10
11// ---------------------------------------------------------------------------
12// In-flight mutation schedule buffering
13// ---------------------------------------------------------------------------
14
15/// A scheduled function call captured during a mutation handler. Held in
16/// the per-mutation pending list until the surrounding transaction
17/// commits — at which point we drain and enqueue. On rollback the list
18/// is dropped without enqueuing, so a failed mutation can't leave behind
19/// scheduled side-effects (the docs claim this; before this buffer the
20/// claim was false).
21#[derive(Debug, Clone)]
22pub(crate) struct PendingSchedule {
23    pub fn_name: String,
24    pub args: serde_json::Value,
25    pub delay_ms: Option<u64>,
26    pub run_at: Option<u64>,
27}
28
29thread_local! {
30    /// Set by the mutation entry point (top-level + nested) for the
31    /// duration of a TS handler call. The schedule hook checks this
32    /// thread-local: when `Some`, scheduling buffers into the inner
33    /// `Vec`; when `None`, the hook enqueues immediately (the
34    /// historical, non-mutation behavior). The Bun stdio loop is
35    /// single-threaded per call (the runner holds `io_lock` for the
36    /// whole call duration), so a thread-local is the right scoping
37    /// primitive — no cross-thread leakage.
38    pub(crate) static MUTATION_SCHEDULE_BUFFER: std::cell::RefCell<Option<std::rc::Rc<std::cell::RefCell<Vec<PendingSchedule>>>>>
39        = const { std::cell::RefCell::new(None) };
40}
41
42/// RAII guard that pushes a schedule buffer onto the thread-local for
43/// the duration of a mutation handler call, then restores the previous
44/// value (which is almost always `None`, but supports nested mutation
45/// handlers stacking buffers correctly).
46pub(crate) struct ScheduleBufferGuard {
47    previous: Option<std::rc::Rc<std::cell::RefCell<Vec<PendingSchedule>>>>,
48    current: std::rc::Rc<std::cell::RefCell<Vec<PendingSchedule>>>,
49}
50
51impl ScheduleBufferGuard {
52    pub(crate) fn enter() -> Self {
53        let current = std::rc::Rc::new(std::cell::RefCell::new(Vec::new()));
54        let previous = MUTATION_SCHEDULE_BUFFER.with(|cell| {
55            let mut slot = cell.borrow_mut();
56            let old = slot.take();
57            *slot = Some(current.clone());
58            old
59        });
60        Self { previous, current }
61    }
62
63    /// Drain the buffer captured during this guard's lifetime. Caller
64    /// flushes after COMMIT succeeds; on rollback the buffer is just
65    /// dropped along with the guard.
66    pub(crate) fn take(&self) -> Vec<PendingSchedule> {
67        std::mem::take(&mut *self.current.borrow_mut())
68    }
69}
70
71impl Drop for ScheduleBufferGuard {
72    fn drop(&mut self) {
73        MUTATION_SCHEDULE_BUFFER.with(|cell| {
74            *cell.borrow_mut() = self.previous.take();
75        });
76    }
77}
78
79// ---------------------------------------------------------------------------
80// In-flight mutation depth marker (deadlock guard)
81// ---------------------------------------------------------------------------
82
83thread_local! {
84    /// Counter of mutation-tx frames currently on the stack for this
85    /// thread. Both backends acquire a single connection mutex per
86    /// mutation (SQLite's write_conn, PG's `LivePostgresAdapter`).
87    /// `std::sync::Mutex` is NOT re-entrant — a TS handler that calls
88    /// `runMutation` from inside another mutation would block forever
89    /// trying to re-acquire the connection lock it already holds.
90    /// The nested-call hook checks this counter and rejects the call
91    /// with `NESTED_MUTATION` instead of hanging.
92    ///
93    /// Counter (not bool) so future savepoint-based nesting could
94    /// switch to a tx-reuse path without changing call sites.
95    static MUTATION_DEPTH: std::cell::Cell<u32> = const { std::cell::Cell::new(0) };
96}
97
98/// RAII marker — incremented on entry to a mutation handler, decremented
99/// on exit (including unwind). Used by the nested-call hook to detect
100/// recursive mutations and reject them with a clear error rather than
101/// deadlocking on the non-reentrant connection mutex.
102pub(crate) struct MutationDepthGuard;
103
104impl MutationDepthGuard {
105    pub(crate) fn enter() -> Self {
106        MUTATION_DEPTH.with(|d| d.set(d.get().saturating_add(1)));
107        Self
108    }
109}
110
111impl Drop for MutationDepthGuard {
112    fn drop(&mut self) {
113        MUTATION_DEPTH.with(|d| d.set(d.get().saturating_sub(1)));
114    }
115}
116
117/// True iff this thread is currently inside a mutation handler's tx.
118pub(crate) fn in_mutation_tx() -> bool {
119    MUTATION_DEPTH.with(|d| d.get() > 0)
120}
121
122// ---------------------------------------------------------------------------
123// PG /api/transact CRDT-aware impl
124// ---------------------------------------------------------------------------
125
126impl Runtime {
127    /// PG `/api/transact` implementation that runs each op through
128    /// the typed `tx_*` helpers (so FTS shadow rows + the new id-
129    /// reject + per-row locking all apply) AND adds CRDT projection
130    /// + sidecar maintenance for `crdt: true` entities. Without this,
131    /// batched admin writes desync from the CRDT layer.
132    pub(crate) fn pg_transact_with_crdt(
133        &self,
134        pg: &crate::PgBackend,
135        ops: &[serde_json::Value],
136    ) -> Result<(bool, Vec<serde_json::Value>), DataError> {
137        use pylon_storage::pg_tx_store::{tx_delete, tx_insert, tx_update};
138
139        // Pre-validate every op shape — a malformed payload should
140        // never open a tx and immediately roll back.
141        enum Op<'a> {
142            Insert {
143                entity: &'a str,
144                data: &'a serde_json::Value,
145            },
146            Update {
147                entity: &'a str,
148                id: &'a str,
149                data: &'a serde_json::Value,
150            },
151            Delete {
152                entity: &'a str,
153                id: &'a str,
154            },
155        }
156        let mut typed: Vec<Op<'_>> = Vec::with_capacity(ops.len());
157        for op in ops {
158            let op_type = op.get("op").and_then(|v| v.as_str()).unwrap_or("");
159            let entity = op
160                .get("entity")
161                .and_then(|v| v.as_str())
162                .ok_or_else(|| DataError {
163                    code: "TX_INVALID_OP".into(),
164                    message: "Each transact op must have an \"entity\" field".into(),
165                })?;
166            match op_type {
167                "insert" => {
168                    let data = op.get("data").ok_or_else(|| DataError {
169                        code: "TX_INVALID_OP".into(),
170                        message: "insert op requires \"data\"".into(),
171                    })?;
172                    typed.push(Op::Insert { entity, data });
173                }
174                "update" => {
175                    let id = op
176                        .get("id")
177                        .and_then(|v| v.as_str())
178                        .ok_or_else(|| DataError {
179                            code: "TX_INVALID_OP".into(),
180                            message: "update op requires \"id\"".into(),
181                        })?;
182                    let data = op.get("data").ok_or_else(|| DataError {
183                        code: "TX_INVALID_OP".into(),
184                        message: "update op requires \"data\"".into(),
185                    })?;
186                    typed.push(Op::Update { entity, id, data });
187                }
188                "delete" => {
189                    let id = op
190                        .get("id")
191                        .and_then(|v| v.as_str())
192                        .ok_or_else(|| DataError {
193                            code: "TX_INVALID_OP".into(),
194                            message: "delete op requires \"id\"".into(),
195                        })?;
196                    typed.push(Op::Delete { entity, id });
197                }
198                other => {
199                    return Err(DataError {
200                        code: "TX_INVALID_OP".into(),
201                        message: format!("unknown op \"{other}\""),
202                    });
203                }
204            }
205        }
206
207        // Track which CRDT rows we touched so we can refresh their
208        // cache entries after commit (or evict on rollback).
209        let mut crdt_touched: Vec<(String, String)> = Vec::new();
210
211        let manifest = self.manifest.clone();
212        let result = pg.store.with_transaction_raw(|tx| -> Result<Vec<serde_json::Value>, DataError> {
213            let mut json_results: Vec<serde_json::Value> = Vec::with_capacity(typed.len());
214            for op in &typed {
215                let result = match op {
216                    Op::Insert { entity, data } => {
217                        let ent = manifest.entities.iter().find(|e| e.name == *entity);
218                        let id = if ent.map(|e| e.crdt).unwrap_or(false) {
219                            let crdt_fields = self.crdt_fields_for(ent.unwrap()).map_err(|e| {
220                                DataError { code: e.code, message: e.message }
221                            })?;
222                            let id = crate::generate_id();
223                            pg.crdt
224                                .apply_patch(tx, entity, &id, &crdt_fields, data)
225                                .map_err(|e| DataError {
226                                    code: "CRDT_APPLY_FAILED".into(),
227                                    message: format!("crdt write {entity}/{id}: {e}"),
228                                })?;
229                            let mut row = (*data).clone();
230                            if let Some(obj) = row.as_object_mut() {
231                                obj.insert("id".into(), serde_json::Value::String(id.clone()));
232                            }
233                            tx_insert(tx, &manifest, entity, &row)?;
234                            crdt_touched.push((entity.to_string(), id.clone()));
235                            id
236                        } else {
237                            tx_insert(tx, &manifest, entity, data)?
238                        };
239                        serde_json::json!({ "op": "insert", "id": id })
240                    }
241                    Op::Update { entity, id, data } => {
242                        let ent = manifest.entities.iter().find(|e| e.name == *entity);
243                        let updated = if ent.map(|e| e.crdt).unwrap_or(false) {
244                            let crdt_fields = self.crdt_fields_for(ent.unwrap()).map_err(|e| {
245                                DataError { code: e.code, message: e.message }
246                            })?;
247                            pg.crdt
248                                .apply_patch(tx, entity, id, &crdt_fields, data)
249                                .map_err(|e| DataError {
250                                    code: "CRDT_APPLY_FAILED".into(),
251                                    message: format!("crdt update {entity}/{id}: {e}"),
252                                })?;
253                            let updated = tx_update(tx, &manifest, entity, id, data)?;
254                            if !updated {
255                                return Err(DataError {
256                                    code: "ENTITY_NOT_FOUND".into(),
257                                    message: format!(
258                                        "Update on {entity}/{id} found no row — refusing to commit \
259                                         a CRDT snapshot that would orphan."
260                                    ),
261                                });
262                            }
263                            crdt_touched.push((entity.to_string(), id.to_string()));
264                            updated
265                        } else {
266                            tx_update(tx, &manifest, entity, id, data)?
267                        };
268                        serde_json::json!({ "op": "update", "id": id, "updated": updated })
269                    }
270                    Op::Delete { entity, id } => {
271                        let ent = manifest.entities.iter().find(|e| e.name == *entity);
272                        let deleted = if ent.map(|e| e.crdt).unwrap_or(false) {
273                            tx.execute(
274                                "DELETE FROM _pylon_crdt_snapshots WHERE entity = $1 AND row_id = $2",
275                                &[entity, id],
276                            )
277                            .map_err(|e| DataError {
278                                code: "CRDT_SIDECAR_DELETE_FAILED".into(),
279                                message: format!(
280                                    "delete pg crdt snapshot {entity}/{id}: {e}"
281                                ),
282                            })?;
283                            let deleted = tx_delete(tx, &manifest, entity, id)?;
284                            crdt_touched.push((entity.to_string(), id.to_string()));
285                            deleted
286                        } else {
287                            tx_delete(tx, &manifest, entity, id)?
288                        };
289                        serde_json::json!({ "op": "delete", "id": id, "deleted": deleted })
290                    }
291                };
292                json_results.push(result);
293            }
294            // Refresh cache for CRDT rows we touched.
295            for (entity, id) in &crdt_touched {
296                pg.crdt.cache_after_commit(tx, entity, id);
297            }
298            Ok(json_results)
299        });
300
301        match result {
302            Ok(json_results) => Ok((true, json_results)),
303            Err(e) => {
304                for (entity, id) in &crdt_touched {
305                    pg.crdt.evict(entity, id);
306                }
307                Err(e)
308            }
309        }
310    }
311}
312
313// ---------------------------------------------------------------------------
314// DataStore → Runtime bridge
315// ---------------------------------------------------------------------------
316
317impl DataStore for Runtime {
318    fn manifest(&self) -> &pylon_kernel::AppManifest {
319        Runtime::manifest(self)
320    }
321
322    fn insert(&self, entity: &str, data: &serde_json::Value) -> Result<String, DataError> {
323        Runtime::insert(self, entity, data).map_err(into_data_error)
324    }
325
326    fn get_by_id(&self, entity: &str, id: &str) -> Result<Option<serde_json::Value>, DataError> {
327        Runtime::get_by_id(self, entity, id).map_err(into_data_error)
328    }
329
330    fn list(&self, entity: &str) -> Result<Vec<serde_json::Value>, DataError> {
331        Runtime::list(self, entity).map_err(into_data_error)
332    }
333
334    fn list_after(
335        &self,
336        entity: &str,
337        after: Option<&str>,
338        limit: usize,
339    ) -> Result<Vec<serde_json::Value>, DataError> {
340        Runtime::list_after(self, entity, after, limit).map_err(into_data_error)
341    }
342
343    fn update(&self, entity: &str, id: &str, data: &serde_json::Value) -> Result<bool, DataError> {
344        Runtime::update(self, entity, id, data).map_err(into_data_error)
345    }
346
347    fn delete(&self, entity: &str, id: &str) -> Result<bool, DataError> {
348        Runtime::delete(self, entity, id).map_err(into_data_error)
349    }
350
351    fn lookup(
352        &self,
353        entity: &str,
354        field: &str,
355        value: &str,
356    ) -> Result<Option<serde_json::Value>, DataError> {
357        Runtime::lookup(self, entity, field, value).map_err(into_data_error)
358    }
359
360    fn link(
361        &self,
362        entity: &str,
363        id: &str,
364        relation: &str,
365        target_id: &str,
366    ) -> Result<bool, DataError> {
367        Runtime::link(self, entity, id, relation, target_id).map_err(into_data_error)
368    }
369
370    fn unlink(&self, entity: &str, id: &str, relation: &str) -> Result<bool, DataError> {
371        Runtime::unlink(self, entity, id, relation).map_err(into_data_error)
372    }
373
374    fn query_filtered(
375        &self,
376        entity: &str,
377        filter: &serde_json::Value,
378    ) -> Result<Vec<serde_json::Value>, DataError> {
379        Runtime::query_filtered(self, entity, filter).map_err(into_data_error)
380    }
381
382    fn query_graph(&self, query: &serde_json::Value) -> Result<serde_json::Value, DataError> {
383        Runtime::query_graph(self, query).map_err(into_data_error)
384    }
385
386    fn aggregate(
387        &self,
388        entity: &str,
389        spec: &serde_json::Value,
390    ) -> Result<serde_json::Value, DataError> {
391        Runtime::aggregate(self, entity, spec).map_err(into_data_error)
392    }
393
394    fn transact(
395        &self,
396        ops: &[serde_json::Value],
397    ) -> Result<(bool, Vec<serde_json::Value>), DataError> {
398        // Postgres mode: delegate to the runtime-layer wrapper that
399        // adds CRDT projection + sidecar maintenance for crdt:true
400        // entities. The storage layer's transact (PostgresDataStore::
401        // transact) only knows about FTS — codex flagged that
402        // /api/transact would silently desync CRDT state.
403        if let Some(pg) = self.pg_backend() {
404            return self.pg_transact_with_crdt(pg, ops);
405        }
406        let conn = self.lock_conn_pub().map_err(into_data_error)?;
407        let _ = conn.execute("BEGIN", []);
408        let mut results: Vec<serde_json::Value> = Vec::new();
409        let mut rollback = false;
410
411        for op in ops {
412            let op_type = op.get("op").and_then(|v| v.as_str()).unwrap_or("");
413            let entity = op.get("entity").and_then(|v| v.as_str()).unwrap_or("");
414
415            match op_type {
416                "insert" => {
417                    let data = op.get("data").cloned().unwrap_or(serde_json::json!({}));
418                    match self.insert_with_conn(&conn, entity, &data) {
419                        Ok(id) => {
420                            results.push(serde_json::json!({"op": "insert", "id": id}));
421                        }
422                        Err(e) => {
423                            results.push(serde_json::json!({"op": "insert", "error": e.message}));
424                            rollback = true;
425                            break;
426                        }
427                    }
428                }
429                "update" => {
430                    let id = op.get("id").and_then(|v| v.as_str()).unwrap_or("");
431                    let data = op.get("data").cloned().unwrap_or(serde_json::json!({}));
432                    match self.update_with_conn(&conn, entity, id, &data) {
433                        Ok(_) => {
434                            results.push(serde_json::json!({"op": "update", "id": id}));
435                        }
436                        Err(e) => {
437                            results.push(serde_json::json!({"op": "update", "error": e.message}));
438                            rollback = true;
439                            break;
440                        }
441                    }
442                }
443                "delete" => {
444                    let id = op.get("id").and_then(|v| v.as_str()).unwrap_or("");
445                    match self.delete_with_conn(&conn, entity, id) {
446                        Ok(_) => {
447                            results.push(serde_json::json!({"op": "delete", "id": id}));
448                        }
449                        Err(e) => {
450                            results.push(serde_json::json!({"op": "delete", "error": e.message}));
451                            rollback = true;
452                            break;
453                        }
454                    }
455                }
456                _ => {
457                    results.push(serde_json::json!({"op": op_type, "error": "unknown operation"}));
458                }
459            }
460        }
461
462        if rollback {
463            let _ = conn.execute("ROLLBACK", []);
464        } else {
465            let _ = conn.execute("COMMIT", []);
466        }
467
468        Ok((!rollback, results))
469    }
470
471    /// Bridge the typed `SearchQuery` / `SearchResult` shapes to the
472    /// trait's JSON-in / JSON-out contract. The router passes a JSON
473    /// body; we deserialize, look up the entity's `SearchConfig`, run
474    /// the planner, and re-serialize. Serialization round-tripping
475    /// lets this method live on the DataStore trait without forcing
476    /// pylon-http to depend on pylon-storage.
477    fn search(
478        &self,
479        entity: &str,
480        query: &serde_json::Value,
481    ) -> Result<serde_json::Value, DataError> {
482        let ent = self
483            .manifest()
484            .entities
485            .iter()
486            .find(|e| e.name == entity)
487            .ok_or_else(|| DataError {
488                code: "ENTITY_NOT_FOUND".into(),
489                message: format!("Unknown entity: {entity}"),
490            })?;
491        let cfg = ent.search.as_ref().ok_or_else(|| DataError {
492            code: "SEARCH_NOT_CONFIGURED".into(),
493            message: format!("Entity {entity} has no `search:` config"),
494        })?;
495        let parsed: pylon_storage::search::SearchQuery = serde_json::from_value(query.clone())
496            .map_err(|e| DataError {
497                code: "INVALID_QUERY".into(),
498                message: format!("search query body: {e}"),
499            })?;
500
501        // Postgres: dispatch to the PG-native FTS path (`tsvector` +
502        // GIN index, maintained transactionally alongside CRUD by
503        // PgTxStore). Same `SearchResult` shape as the SQLite path.
504        if self.is_postgres() {
505            let pg = self.pg_data_store().ok_or_else(|| DataError {
506                code: "PG_DATASTORE_MISSING".into(),
507                message: "is_postgres=true but pg_data_store() returned None".into(),
508            })?;
509            let result = pg.run_search(entity, cfg, &parsed).map_err(|e| DataError {
510                code: e.code,
511                message: e.message,
512            })?;
513            return serde_json::to_value(&result).map_err(|e| DataError {
514                code: "SEARCH_SERIALIZE_FAILED".into(),
515                message: e.to_string(),
516            });
517        }
518
519        let conn = self.lock_conn_pub().map_err(into_data_error)?;
520        let result =
521            pylon_storage::search_query::run_search(&conn, entity, cfg, &parsed).map_err(|e| {
522                DataError {
523                    code: e.code,
524                    message: e.message,
525                }
526            })?;
527        serde_json::to_value(&result).map_err(|e| DataError {
528            code: "SEARCH_SERIALIZE_FAILED".into(),
529            message: e.to_string(),
530        })
531    }
532
533    /// Return the binary CRDT snapshot for a row. `Ok(None)` for any
534    /// entity with `crdt: false` (the LWW opt-out) — the router uses
535    /// that to decide whether to ship a binary update over WebSocket
536    /// after the write.
537    fn crdt_snapshot(&self, entity: &str, row_id: &str) -> Result<Option<Vec<u8>>, DataError> {
538        // Postgres: read from the PG `_pylon_crdt_snapshots` sidecar
539        // via the same `PgLoroStore` that maintenance writes go
540        // through. Same Ok(None) early-exit for non-CRDT entities so
541        // the router skips the binary broadcast.
542        if self.is_postgres() {
543            let ent = self
544                .manifest()
545                .entities
546                .iter()
547                .find(|e| e.name == entity)
548                .ok_or_else(|| DataError {
549                    code: "ENTITY_NOT_FOUND".into(),
550                    message: format!("Unknown entity: {entity}"),
551                })?;
552            if !ent.crdt {
553                return Ok(None);
554            }
555            let pg_backend = match self.pg_backend() {
556                Some(pg) => pg,
557                None => return Ok(None),
558            };
559            // Single-read; with_client is fine here. PgLoroStore's
560            // hydrate-on-miss + per-row Mutex ensures consistent
561            // bytes even under concurrent applies on other threads.
562            let snap = pg_backend.store.with_client(|client| -> Result<Vec<u8>, DataError> {
563                pg_backend
564                    .crdt
565                    .snapshot(client, entity, row_id)
566                    .map_err(|e| DataError {
567                        code: "CRDT_SNAPSHOT_FAILED".into(),
568                        message: format!("snapshot {entity}/{row_id}: {e}"),
569                    })
570            })?;
571            return Ok(Some(snap));
572        }
573        let ent = self
574            .manifest()
575            .entities
576            .iter()
577            .find(|e| e.name == entity)
578            .ok_or_else(|| DataError {
579                code: "ENTITY_NOT_FOUND".into(),
580                message: format!("Unknown entity: {entity}"),
581            })?;
582        if !ent.crdt {
583            return Ok(None);
584        }
585        let conn = self.lock_conn_pub().map_err(into_data_error)?;
586        let snap = self
587            .crdt_store()
588            .snapshot(&conn, entity, row_id)
589            .map_err(|e| DataError {
590                code: "CRDT_SNAPSHOT_FAILED".into(),
591                message: format!("snapshot {entity}/{row_id}: {e}"),
592            })?;
593        Ok(Some(snap))
594    }
595
596    /// Client-pushed Loro update. Imports into the row's LoroDoc,
597    /// re-projects the doc state into the materialized SQLite columns
598    /// (so subsequent reads see the merged content), and returns the
599    /// fresh full-row snapshot for the router to broadcast to other
600    /// clients.
601    ///
602    /// Wrapped in a single SQLite transaction — same crash-safety
603    /// shape as `Runtime::insert/update`. Either the LoroStore +
604    /// SQLite columns both update or neither does.
605    fn crdt_apply_update(
606        &self,
607        entity: &str,
608        row_id: &str,
609        update: &[u8],
610    ) -> Result<Vec<u8>, DataError> {
611        // Postgres: import the binary update into the row's PG-side
612        // LoroDoc, persist the new snapshot in `_pylon_crdt_snapshots`,
613        // re-project to the materialized PG row's columns, and return
614        // the fresh full snapshot for the router to broadcast. Same
615        // shape as the SQLite path below.
616        if self.is_postgres() {
617            let ent = self
618                .manifest()
619                .entities
620                .iter()
621                .find(|e| e.name == entity)
622                .ok_or_else(|| DataError {
623                    code: "ENTITY_NOT_FOUND".into(),
624                    message: format!("Unknown entity: {entity}"),
625                })?
626                .clone();
627            if !ent.crdt {
628                return Err(DataError {
629                    code: "NOT_SUPPORTED".into(),
630                    message: format!(
631                        "CRDT update sent for entity \"{entity}\" which has crdt: false"
632                    ),
633                });
634            }
635            let pg_backend = self.pg_backend().ok_or_else(|| DataError {
636                code: "PG_BACKEND_MISSING".into(),
637                message: "is_postgres=true but pg_backend() returned None".into(),
638            })?;
639            let crdt_fields = self.crdt_fields_for(&ent).map_err(into_data_error)?;
640
641            // One transaction: apply the peer's update into the
642            // LoroDoc + persist the new snapshot + reproject into the
643            // materialized PG row + read back the fresh snapshot for
644            // broadcast. Pre-fix this was three separate autocommits
645            // — a failure between them desynced the layers, and the
646            // broadcast snapshot might not reflect what actually
647            // landed on disk.
648            let result = pg_backend.store.with_transaction_raw(|tx| -> Result<Vec<u8>, DataError> {
649                let projected = pg_backend
650                    .crdt
651                    .apply_remote_update(tx, entity, row_id, &crdt_fields, update)
652                    .map_err(|e| {
653                        // Distinguish decode errors (malformed client
654                        // bytes — caller's fault, 400) from apply
655                        // errors (schema mismatch, also caller's
656                        // fault but a different shape). The CRDT
657                        // route maps CRDT_DECODE_FAILED → 400, so
658                        // unmapped errors land as 500 — codex
659                        // flagged the asymmetry vs the SQLite path.
660                        let code = match &e {
661                            crate::loro_store::LoroStoreError::Decode(_) => "CRDT_DECODE_FAILED",
662                            _ => "CRDT_APPLY_FAILED",
663                        };
664                        DataError {
665                            code: code.into(),
666                            message: format!("crdt apply update {entity}/{row_id}: {e}"),
667                        }
668                    })?;
669                let updated = pylon_storage::pg_tx_store::tx_update(
670                    tx,
671                    self.manifest(),
672                    entity,
673                    row_id,
674                    &projected,
675                )?;
676                if !updated {
677                    // Same orphan guard as Runtime::update — refuse
678                    // to commit a snapshot for a row that doesn't
679                    // exist. Peer pushed an update for a row this
680                    // replica's never seen.
681                    return Err(DataError {
682                        code: "ENTITY_NOT_FOUND".into(),
683                        message: format!(
684                            "Peer-pushed CRDT update targets {entity}/{row_id} which has \
685                             no materialized row — refusing to commit an orphan snapshot."
686                        ),
687                    });
688                }
689                // Read the snapshot back from the tx, bypassing the
690                // cache — a prior `crdt_snapshot()` call could have
691                // populated the cache with bytes that predate this
692                // peer update, and broadcasting them would silently
693                // omit the just-applied change. Codex flagged this.
694                let snap = crate::pg_loro_store::PgLoroStore::read_snapshot_via_conn(tx, entity, row_id)
695                    .map_err(|e| DataError {
696                        code: "CRDT_SNAPSHOT_FAILED".into(),
697                        message: format!(
698                            "post-update snapshot {entity}/{row_id}: {e}"
699                        ),
700                    })?;
701                // Refresh the cache so the next reader on this
702                // process skips the round-trip.
703                pg_backend.crdt.cache_after_commit(tx, entity, row_id);
704                Ok(snap)
705            });
706            if result.is_err() {
707                // Same cache-coherency hygiene as Runtime::insert /
708                // update — the in-memory doc absorbed the peer's
709                // update before the tx rolled back, so evict to
710                // force re-hydration from the persisted snapshot.
711                pg_backend.crdt.evict(entity, row_id);
712            }
713            return result;
714        }
715        // Find the entity so we can build the projection field list +
716        // confirm CRDT mode is on. Cheap manifest scan; counts are tiny.
717        let ent = self
718            .manifest()
719            .entities
720            .iter()
721            .find(|e| e.name == entity)
722            .ok_or_else(|| DataError {
723                code: "ENTITY_NOT_FOUND".into(),
724                message: format!("Unknown entity: {entity}"),
725            })?
726            .clone();
727        if !ent.crdt {
728            return Err(DataError {
729                code: "NOT_SUPPORTED".into(),
730                message: format!("Entity {entity} has crdt: false; client push requires CRDT mode"),
731            });
732        }
733        let crdt_fields = self.crdt_fields_for(&ent).map_err(into_data_error)?;
734
735        let conn = self.lock_conn_pub().map_err(into_data_error)?;
736        crate::with_write_tx(&conn, || -> Result<Vec<u8>, crate::RuntimeError> {
737            // Apply the update to the LoroDoc + persist the new snapshot
738            // to the sidecar. Returns the projected JSON shape for the
739            // post-merge state.
740            let projected = self
741                .crdt_store()
742                .apply_remote_update(&conn, entity, row_id, &crdt_fields, update)
743                .map_err(|e| crate::RuntimeError {
744                    code: "CRDT_APPLY_FAILED".into(),
745                    message: format!("apply_remote_update {entity}/{row_id}: {e}"),
746                })?;
747
748            // Re-project into the materialized SQLite row so SELECT
749            // queries see the merged content. Build SET clauses from
750            // the projection — every CRDT-managed field gets rewritten.
751            let projection = projected.as_object().ok_or_else(|| crate::RuntimeError {
752                code: "CRDT_PROJECTION_INVALID".into(),
753                message: "projected row was not a JSON object".into(),
754            })?;
755
756            let mut set_clauses = Vec::with_capacity(projection.len());
757            let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
758            let mut idx = 1;
759            for (key, val) in projection {
760                if key == "id" {
761                    continue;
762                }
763                set_clauses.push(format!("{} = ?{idx}", crate::quote_ident(key.as_str())));
764                values.push(crate::json_to_sql(val));
765                idx += 1;
766            }
767            if set_clauses.is_empty() {
768                // No projected fields — happens when the doc has no
769                // top-level keys yet (fresh row from a peer subscribing
770                // before any writes). Skip the UPDATE; row may not exist
771                // in SQLite. Subsequent inserts will materialize it.
772            } else {
773                values.push(Box::new(row_id.to_string()));
774                let sql = format!(
775                    "UPDATE {} SET {} WHERE \"id\" = ?{idx}",
776                    crate::quote_ident(entity),
777                    set_clauses.join(", ")
778                );
779                let params: Vec<&dyn rusqlite::types::ToSql> =
780                    values.iter().map(|v| v.as_ref()).collect();
781                conn.execute(&sql, params.as_slice())
782                    .map_err(|e| crate::RuntimeError {
783                        code: "UPDATE_FAILED".into(),
784                        message: format!("post-merge UPDATE {entity}/{row_id}: {e}"),
785                    })?;
786            }
787
788            // Return the new snapshot for the router to broadcast.
789            let snap = self
790                .crdt_store()
791                .snapshot(&conn, entity, row_id)
792                .map_err(|e| crate::RuntimeError {
793                    code: "CRDT_SNAPSHOT_FAILED".into(),
794                    message: format!("post-merge snapshot {entity}/{row_id}: {e}"),
795                })?;
796            Ok(snap)
797        })
798        .map_err(into_data_error)
799    }
800}
801
802fn into_data_error(e: crate::RuntimeError) -> DataError {
803    DataError {
804        code: e.code,
805        message: e.message,
806    }
807}
808
809// ---------------------------------------------------------------------------
810// ChangeNotifier for WsHub + SseHub
811// ---------------------------------------------------------------------------
812
813use crate::sse::SseHub;
814use crate::ws::WsHub;
815use std::sync::Arc;
816
817/// Bridges WebSocket + SSE hubs to the router's [`ChangeNotifier`] trait.
818pub struct WsSseNotifier {
819    pub ws: Arc<WsHub>,
820    pub sse: Arc<SseHub>,
821}
822
823impl pylon_router::ChangeNotifier for WsSseNotifier {
824    fn notify(&self, event: &pylon_sync::ChangeEvent) {
825        self.ws.broadcast(event);
826        self.sse.broadcast(event);
827    }
828
829    fn notify_presence(&self, json: &str) {
830        self.ws.broadcast_presence(json);
831        self.sse.broadcast_message(json);
832    }
833
834    /// Encode a CRDT broadcast frame (1-byte type + length-prefixed
835    /// entity + length-prefixed row_id + Loro snapshot bytes) and ship
836    /// it to clients SUBSCRIBED to this row. SSE is text-only so it
837    /// gets skipped — clients on the SSE transport stay on the JSON
838    /// change-event path until a future SSE-friendly encoding (base64
839    /// or hex-encoded chunks) lands.
840    ///
841    /// Filtering by subscription instead of broadcasting to every WS
842    /// client matters once more than a handful of rows are in flight:
843    /// a 50-channel app with 100 connected users would otherwise fan
844    /// 100x for every keystroke in a single channel. Now each binary
845    /// frame goes only to the (typically small) set of tabs that asked
846    /// to mirror that specific row.
847    ///
848    /// If no clients are subscribed (empty list) the frame is dropped
849    /// silently — the JSON change event from `notify` already told
850    /// every connected client a write happened, so non-subscribed
851    /// clients can re-fetch via the regular query path if they care.
852    ///
853    /// Authz: the policy check happens at SUBSCRIBE TIME (in
854    /// `start_ws_server`'s SnapshotFetcher closure) — clients on the
855    /// subscriber list have already passed `check_entity_read` for
856    /// the row at that moment. We don't re-check on every broadcast
857    /// because the broadcast hot path runs from the write thread
858    /// without per-client auth context. A consequence: if a client is
859    /// already subscribed and their permissions change mid-session
860    /// (e.g. they're removed from a private channel), they'll keep
861    /// receiving CRDT frames for that row until they disconnect.
862    /// Future work: index subscribers by auth context so the broadcast
863    /// can re-check, or invalidate subscriptions on policy changes.
864    ///
865    /// Frame-encode failure (entity / row_id over the 16-bit length
866    /// header) gets logged and dropped — the row's regular JSON change
867    /// event already shipped via `notify`, so clients still see the
868    /// write happened, they just don't get the binary CRDT delta.
869    fn notify_crdt(&self, entity: &str, row_id: &str, snapshot: &[u8]) {
870        let subscribers = self.ws.subscriptions().subscribers(entity, row_id);
871        if subscribers.is_empty() {
872            return;
873        }
874        match pylon_router::encode_crdt_frame(
875            pylon_router::CRDT_FRAME_SNAPSHOT,
876            entity,
877            row_id,
878            snapshot,
879        ) {
880            Ok(frame) => self.ws.broadcast_binary_to(&subscribers, frame),
881            Err(e) => {
882                tracing::warn!("[crdt] dropping binary frame for {entity}/{row_id}: {e}");
883            }
884        }
885    }
886}
887
888/// Serialize a value to JSON, falling back to `{}` on failure.
889fn to_json<T: serde::Serialize>(val: T) -> serde_json::Value {
890    serde_json::to_value(val).unwrap_or(serde_json::json!({}))
891}
892
893/// Serialize a value to JSON, falling back to `[]` on failure.
894fn to_json_array<T: serde::Serialize>(val: T) -> serde_json::Value {
895    serde_json::to_value(val).unwrap_or(serde_json::json!([]))
896}
897
898// ---------------------------------------------------------------------------
899// Adapter: RoomManager → RoomOps
900// ---------------------------------------------------------------------------
901
902use crate::rooms::RoomManager;
903
904impl pylon_router::RoomOps for RoomManager {
905    fn join(
906        &self,
907        room: &str,
908        user_id: &str,
909        data: Option<serde_json::Value>,
910    ) -> Result<(serde_json::Value, serde_json::Value), DataError> {
911        RoomManager::join(self, room, user_id, data)
912            .map(|(snapshot, join_event)| (to_json(&snapshot), to_json(&join_event)))
913            .map_err(|e| DataError {
914                code: e.code,
915                message: e.message,
916            })
917    }
918
919    fn leave(&self, room: &str, user_id: &str) -> Option<serde_json::Value> {
920        RoomManager::leave(self, room, user_id).map(|event| to_json(&event))
921    }
922
923    fn set_presence(
924        &self,
925        room: &str,
926        user_id: &str,
927        data: serde_json::Value,
928    ) -> Option<serde_json::Value> {
929        RoomManager::set_presence(self, room, user_id, data).map(|event| to_json(&event))
930    }
931
932    fn broadcast(
933        &self,
934        room: &str,
935        sender: Option<&str>,
936        topic: &str,
937        data: serde_json::Value,
938    ) -> Option<serde_json::Value> {
939        RoomManager::broadcast(self, room, sender, topic, data).map(|event| to_json(&event))
940    }
941
942    fn list_rooms(&self) -> Vec<String> {
943        RoomManager::list_rooms(self)
944    }
945
946    fn room_size(&self, name: &str) -> usize {
947        RoomManager::room_size(self, name)
948    }
949
950    fn members(&self, name: &str) -> Vec<serde_json::Value> {
951        RoomManager::members(self, name)
952            .into_iter()
953            .map(|p| to_json(p))
954            .collect()
955    }
956}
957
958// ---------------------------------------------------------------------------
959// Adapter: CachePlugin → CacheOps (newtype wrapper for orphan rule)
960// ---------------------------------------------------------------------------
961
962use pylon_plugin::builtin::cache::CachePlugin;
963
964/// Adapter that routes router-level CRUD hook calls into the PluginRegistry.
965///
966/// The router holds a `&dyn PluginHookOps`; this adapter wraps the runtime's
967/// `Arc<PluginRegistry>` so registered plugins (audit_log, validation,
968/// webhooks, timestamps, slugify, versioning, search) run on every
969/// POST/PATCH/DELETE under `/api/entities/*`. Without this wiring, plugins
970/// only saw the `on_request` hook and never got a chance to observe or
971/// reject data-plane writes — a quiet correctness hole noted in the
972/// pentest review.
973pub struct PluginHooksAdapter(pub Arc<pylon_plugin::PluginRegistry>);
974
975impl pylon_router::PluginHookOps for PluginHooksAdapter {
976    fn before_insert(
977        &self,
978        entity: &str,
979        data: &mut serde_json::Value,
980        auth: &pylon_auth::AuthContext,
981    ) -> Result<(), (u16, String, String)> {
982        self.0
983            .run_before_insert(entity, data, auth)
984            .map_err(|e| (e.status, e.code, e.message))
985    }
986    fn after_insert(
987        &self,
988        entity: &str,
989        id: &str,
990        data: &serde_json::Value,
991        auth: &pylon_auth::AuthContext,
992    ) {
993        self.0.run_after_insert(entity, id, data, auth);
994    }
995    fn before_update(
996        &self,
997        entity: &str,
998        id: &str,
999        data: &mut serde_json::Value,
1000        auth: &pylon_auth::AuthContext,
1001    ) -> Result<(), (u16, String, String)> {
1002        self.0
1003            .run_before_update(entity, id, data, auth)
1004            .map_err(|e| (e.status, e.code, e.message))
1005    }
1006    fn after_update(
1007        &self,
1008        entity: &str,
1009        id: &str,
1010        data: &serde_json::Value,
1011        auth: &pylon_auth::AuthContext,
1012    ) {
1013        self.0.run_after_update(entity, id, data, auth);
1014    }
1015    fn before_delete(
1016        &self,
1017        entity: &str,
1018        id: &str,
1019        auth: &pylon_auth::AuthContext,
1020    ) -> Result<(), (u16, String, String)> {
1021        self.0
1022            .run_before_delete(entity, id, auth)
1023            .map_err(|e| (e.status, e.code, e.message))
1024    }
1025    fn after_delete(&self, entity: &str, id: &str, auth: &pylon_auth::AuthContext) {
1026        self.0.run_after_delete(entity, id, auth);
1027    }
1028}
1029
1030pub struct CacheAdapter(pub Arc<CachePlugin>);
1031
1032impl pylon_router::CacheOps for CacheAdapter {
1033    fn handle_command(&self, body: &str) -> (u16, String) {
1034        crate::cache_handlers::handle_cache_command(&self.0, body)
1035    }
1036
1037    fn handle_get(&self, key: &str) -> (u16, String) {
1038        crate::cache_handlers::handle_cache_get(&self.0, key)
1039    }
1040
1041    fn handle_delete(&self, key: &str) -> (u16, String) {
1042        crate::cache_handlers::handle_cache_delete(&self.0, key)
1043    }
1044}
1045
1046// ---------------------------------------------------------------------------
1047// Adapter: PubSubBroker → PubSubOps (newtype wrapper for orphan rule)
1048// ---------------------------------------------------------------------------
1049
1050use crate::pubsub::PubSubBroker;
1051
1052pub struct PubSubAdapter(pub Arc<PubSubBroker>);
1053
1054impl pylon_router::PubSubOps for PubSubAdapter {
1055    fn handle_publish(&self, body: &str) -> (u16, String) {
1056        crate::cache_handlers::handle_pubsub_publish(&self.0, body)
1057    }
1058
1059    fn handle_channels(&self) -> (u16, String) {
1060        crate::cache_handlers::handle_pubsub_channels(&self.0)
1061    }
1062
1063    fn handle_history(&self, channel: &str, url: &str) -> (u16, String) {
1064        crate::cache_handlers::handle_pubsub_history(&self.0, channel, url)
1065    }
1066}
1067
1068// ---------------------------------------------------------------------------
1069// Adapter: JobQueue → JobOps
1070// ---------------------------------------------------------------------------
1071
1072use crate::jobs::{JobQueue, Priority};
1073
1074impl pylon_router::JobOps for JobQueue {
1075    fn enqueue(
1076        &self,
1077        name: &str,
1078        payload: serde_json::Value,
1079        priority: &str,
1080        delay_secs: u64,
1081        max_retries: u32,
1082        queue: &str,
1083    ) -> String {
1084        let pri = Priority::from_str_loose(priority);
1085        JobQueue::enqueue_with_options(self, name, payload, pri, delay_secs, max_retries, queue)
1086    }
1087
1088    fn stats(&self) -> serde_json::Value {
1089        to_json(JobQueue::stats(self))
1090    }
1091
1092    fn dead_letters(&self) -> serde_json::Value {
1093        to_json_array(JobQueue::dead_letters(self))
1094    }
1095
1096    fn retry_dead(&self, id: &str) -> bool {
1097        JobQueue::retry_dead(self, id)
1098    }
1099
1100    fn list_jobs(
1101        &self,
1102        status: Option<&str>,
1103        queue: Option<&str>,
1104        limit: usize,
1105    ) -> serde_json::Value {
1106        to_json_array(JobQueue::list_jobs(self, status, queue, limit))
1107    }
1108
1109    fn get_job(&self, id: &str) -> Option<serde_json::Value> {
1110        JobQueue::get_job(self, id).map(|j| to_json(j))
1111    }
1112}
1113
1114// ---------------------------------------------------------------------------
1115// Adapter: Scheduler → SchedulerOps
1116// ---------------------------------------------------------------------------
1117
1118use crate::scheduler::Scheduler;
1119
1120impl pylon_router::SchedulerOps for Scheduler {
1121    fn list_tasks(&self) -> serde_json::Value {
1122        to_json_array(Scheduler::list_tasks(self))
1123    }
1124
1125    fn trigger(&self, name: &str) -> bool {
1126        Scheduler::trigger(self, name)
1127    }
1128}
1129
1130// ---------------------------------------------------------------------------
1131// Adapter: WorkflowEngine → WorkflowOps
1132// ---------------------------------------------------------------------------
1133
1134use crate::workflows::WorkflowEngine;
1135
1136impl pylon_router::WorkflowOps for WorkflowEngine {
1137    fn definitions(&self) -> serde_json::Value {
1138        to_json_array(WorkflowEngine::definitions(self))
1139    }
1140
1141    fn start(&self, name: &str, input: serde_json::Value) -> Result<String, String> {
1142        WorkflowEngine::start(self, name, input)
1143    }
1144
1145    fn list(&self, status_filter: Option<&str>) -> serde_json::Value {
1146        // Convert string filter to WorkflowStatus for the engine.
1147        let filter = status_filter.and_then(|s| match s {
1148            "pending" => Some(crate::workflows::WorkflowStatus::Pending),
1149            "running" => Some(crate::workflows::WorkflowStatus::Running),
1150            "sleeping" => Some(crate::workflows::WorkflowStatus::Sleeping),
1151            "waiting" => Some(crate::workflows::WorkflowStatus::WaitingForEvent),
1152            "completed" => Some(crate::workflows::WorkflowStatus::Completed),
1153            "failed" => Some(crate::workflows::WorkflowStatus::Failed),
1154            "cancelled" => Some(crate::workflows::WorkflowStatus::Cancelled),
1155            _ => None,
1156        });
1157        to_json_array(WorkflowEngine::list(self, filter.as_ref()))
1158    }
1159
1160    fn get(&self, id: &str) -> Option<serde_json::Value> {
1161        WorkflowEngine::get(self, id).map(|inst| to_json(inst))
1162    }
1163
1164    fn advance(&self, id: &str) -> Result<String, String> {
1165        WorkflowEngine::advance(self, id).map(|status| format!("{:?}", status))
1166    }
1167
1168    fn send_event(&self, id: &str, event: &str, data: serde_json::Value) -> Result<(), String> {
1169        WorkflowEngine::send_event(self, id, event, data)
1170    }
1171
1172    fn cancel(&self, id: &str) -> Result<(), String> {
1173        WorkflowEngine::cancel(self, id)
1174    }
1175}
1176
1177// ---------------------------------------------------------------------------
1178// Adapter: FileStorage trait → FileOps
1179// ---------------------------------------------------------------------------
1180
1181use pylon_storage::files::{FileStorage, LocalFileStorage, Stack0FileStorage};
1182
1183/// Adapter that exposes a [`FileStorage`] backend through the router's [`FileOps`].
1184pub struct FileOpsAdapter {
1185    pub storage: Arc<dyn FileStorage>,
1186}
1187
1188impl FileOpsAdapter {
1189    /// Create from environment variables.
1190    ///
1191    /// Selects backend via `PYLON_FILES_PROVIDER`:
1192    /// - `local` (default) — files saved under `PYLON_FILES_DIR` and served
1193    ///   via `PYLON_FILES_URL_PREFIX`.
1194    /// - `stack0` — uploads go to Stack0's CDN. Requires `PYLON_STACK0_API_KEY`.
1195    pub fn from_env() -> Self {
1196        let provider = std::env::var("PYLON_FILES_PROVIDER").unwrap_or_else(|_| "local".into());
1197        match provider.as_str() {
1198            "stack0" => match Stack0FileStorage::from_env() {
1199                Some(s) => Self {
1200                    storage: Arc::new(s),
1201                },
1202                None => {
1203                    tracing::warn!(
1204                        "PYLON_FILES_PROVIDER=stack0 but PYLON_STACK0_API_KEY is not set; falling back to local storage"
1205                    );
1206                    Self::local_from_env()
1207                }
1208            },
1209            _ => Self::local_from_env(),
1210        }
1211    }
1212
1213    fn local_from_env() -> Self {
1214        let dir = std::env::var("PYLON_FILES_DIR").unwrap_or_else(|_| "uploads".into());
1215        let url_prefix =
1216            std::env::var("PYLON_FILES_URL_PREFIX").unwrap_or_else(|_| "/api/files".into());
1217        Self {
1218            storage: Arc::new(LocalFileStorage::new(&dir, &url_prefix)),
1219        }
1220    }
1221}
1222
1223impl pylon_router::FileOps for FileOpsAdapter {
1224    fn upload(&self, _body: &str) -> (u16, String) {
1225        // The self-hosted server short-circuits /api/files/upload BEFORE the
1226        // request body is lossily coerced to a String, so binary uploads are
1227        // handled there. This fallback exists for non-self-hosted adapters
1228        // (e.g., Workers) and for defense in depth; it rejects string bodies
1229        // that wouldn't carry binary data correctly.
1230        (
1231            400,
1232            pylon_router::json_error(
1233                "UPLOAD_NEEDS_BINARY",
1234                "File uploads must use multipart/form-data or raw binary with X-Filename; this platform does not support string-body uploads",
1235            ),
1236        )
1237    }
1238
1239    fn get_file(&self, id: &str) -> (u16, String) {
1240        match self.storage.get(id) {
1241            Ok(content) => (200, String::from_utf8_lossy(&content).into_owned()),
1242            Err(e) if e.code == "NOT_FOUND" => {
1243                (404, pylon_router::json_error("FILE_NOT_FOUND", &e.message))
1244            }
1245            Err(e) => (400, pylon_router::json_error(&e.code, &e.message)),
1246        }
1247    }
1248}
1249
1250/// Backwards-compatible alias; old code refers to this name.
1251pub type LocalFileOps = FileOpsAdapter;
1252
1253impl LocalFileOps {
1254    /// Default instance backed by the local `uploads/` directory.
1255    pub fn new_default() -> Self {
1256        Self::from_env()
1257    }
1258}
1259
1260// ---------------------------------------------------------------------------
1261// Adapter: EmailTransport → EmailSender
1262// ---------------------------------------------------------------------------
1263
1264use pylon_auth::email::{ConsoleTransport, EmailTransport, HttpEmailTransport};
1265
1266/// Picks an email backend based on environment variables.
1267/// Falls back to `ConsoleTransport` (prints to stderr) when no provider is configured.
1268pub struct EmailAdapter {
1269    transport: Box<dyn EmailTransport>,
1270}
1271
1272impl EmailAdapter {
1273    pub fn from_env() -> Self {
1274        if let Some(http) = HttpEmailTransport::from_env() {
1275            Self {
1276                transport: Box::new(http),
1277            }
1278        } else {
1279            Self {
1280                transport: Box::new(ConsoleTransport),
1281            }
1282        }
1283    }
1284}
1285
1286impl pylon_router::EmailSender for EmailAdapter {
1287    fn send(&self, to: &str, subject: &str, body: &str) -> Result<(), String> {
1288        self.transport
1289            .send(to, subject, body)
1290            .map_err(|e| e.message)
1291    }
1292}
1293
1294// ---------------------------------------------------------------------------
1295// Adapter: OpenAPI generator
1296// ---------------------------------------------------------------------------
1297
1298pub struct RuntimeOpenApiGenerator<'a> {
1299    pub manifest: &'a pylon_kernel::AppManifest,
1300}
1301
1302impl<'a> pylon_router::OpenApiGenerator for RuntimeOpenApiGenerator<'a> {
1303    fn generate(&self, base_url: &str) -> String {
1304        let spec = crate::openapi::generate_openapi(self.manifest, base_url);
1305        serde_json::to_string(&spec).unwrap_or_else(|_| "{}".into())
1306    }
1307}
1308
1309// ---------------------------------------------------------------------------
1310// Adapter: DynShardRegistry → ShardOps
1311// ---------------------------------------------------------------------------
1312
1313/// Wraps any `Arc<dyn DynShardRegistry>` so the router can dispatch shard
1314/// routes without knowing the concrete SimState type.
1315pub struct ShardOpsAdapter {
1316    pub registry: Arc<dyn pylon_realtime::DynShardRegistry>,
1317}
1318
1319impl pylon_router::ShardOps for ShardOpsAdapter {
1320    fn get_shard(&self, id: &str) -> Option<Arc<dyn pylon_realtime::DynShard>> {
1321        self.registry.get(id)
1322    }
1323
1324    fn list_shards(&self) -> Vec<String> {
1325        self.registry.ids()
1326    }
1327
1328    fn shard_count(&self) -> usize {
1329        self.registry.len()
1330    }
1331}
1332
1333#[cfg(test)]
1334mod find_runtime_tests {
1335    use super::*;
1336
1337    #[test]
1338    fn env_override_takes_precedence() {
1339        let dir = std::env::temp_dir().join(format!("pylon_rt_{}", std::process::id()));
1340        let _ = std::fs::create_dir_all(&dir);
1341        let path = dir.join("custom_runtime.ts");
1342        std::fs::write(&path, "// test").unwrap();
1343
1344        std::env::set_var("PYLON_FUNCTIONS_RUNTIME", path.to_str().unwrap());
1345        let found = find_functions_runtime();
1346        std::env::remove_var("PYLON_FUNCTIONS_RUNTIME");
1347
1348        assert_eq!(found.as_deref(), path.to_str());
1349
1350        let _ = std::fs::remove_dir_all(&dir);
1351    }
1352
1353    #[test]
1354    fn returns_none_when_env_path_missing() {
1355        std::env::set_var(
1356            "PYLON_FUNCTIONS_RUNTIME",
1357            "/tmp/definitely-does-not-exist-42.ts",
1358        );
1359        // May still find something in CWD (dev path), so we only assert the env
1360        // path isn't what gets returned.
1361        let found = find_functions_runtime();
1362        std::env::remove_var("PYLON_FUNCTIONS_RUNTIME");
1363        assert_ne!(
1364            found.as_deref(),
1365            Some("/tmp/definitely-does-not-exist-42.ts")
1366        );
1367    }
1368}
1369
1370// ---------------------------------------------------------------------------
1371// TxStore — DataStore backed by a held transaction connection
1372// ---------------------------------------------------------------------------
1373
1374/// A `DataStore` that executes against a pre-held SQLite connection
1375/// for the duration of a single mutation handler.
1376///
1377/// # Safety contract
1378///
1379/// `rusqlite::Connection` is `Send` but not `Sync` (it uses `RefCell`s
1380/// internally for statement caching). The `DataStore` trait requires
1381/// `Send + Sync`, but `&'a Connection` is neither.
1382///
1383/// We hand-implement both via `unsafe impl` because:
1384///
1385/// 1. **Construction.** `TxStore::new` is only ever called by
1386///    `FnOpsImpl::call` for mutations, after acquiring the runtime's
1387///    write lock. The `&Connection` originates from a `MutexGuard`
1388///    that the constructing thread holds.
1389///
1390/// 2. **Lifetime.** The `'a` lifetime ties the `TxStore` to that guard.
1391///    The compiler enforces that the `TxStore` cannot outlive the held
1392///    lock; it must be dropped before the guard is.
1393///
1394/// 3. **Single-threaded use.** `FnRunner::call()` runs the handler
1395///    synchronously on the calling thread and never spawns threads
1396///    holding a reference to the `TxStore`. The `Send + Sync` bounds
1397///    on the `DataStore` trait are satisfied vacuously — no thread
1398///    other than the caller ever sees this `TxStore`.
1399///
1400/// 4. **No interior aliasing.** All `&Connection` calls go through
1401///    `Runtime::*_with_conn` methods which take `&Connection`, never
1402///    keeping the reference alive across an `await` point (this is
1403///    sync code, no awaits).
1404///
1405/// Future work: refactor `Runtime`'s `write_conn` to be
1406/// `Arc<Mutex<Connection>>` so TxStore can hold an `Arc<Mutex<...>>`,
1407/// eliminating the unsafe impl entirely.
1408pub struct TxStore<'a> {
1409    runtime: &'a Runtime,
1410    conn: &'a rusqlite::Connection,
1411    /// Pending change events to broadcast after the outer transaction
1412    /// commits. Buffered here rather than pushed to ChangeLog + notifier
1413    /// immediately so a rollback doesn't emit events for writes that
1414    /// didn't actually land.
1415    pending: std::cell::RefCell<Vec<pylon_sync::ChangeEvent>>,
1416}
1417
1418impl<'a> TxStore<'a> {
1419    pub fn new(runtime: &'a Runtime, conn: &'a rusqlite::Connection) -> Self {
1420        Self {
1421            runtime,
1422            conn,
1423            pending: std::cell::RefCell::new(Vec::new()),
1424        }
1425    }
1426
1427    /// Drain the pending-events buffer. Called after COMMIT succeeds;
1428    /// the caller is responsible for appending each event to the
1429    /// ChangeLog and broadcasting via the notifier. On rollback the
1430    /// caller just drops the buffer without calling this.
1431    pub fn take_pending(&self) -> Vec<pylon_sync::ChangeEvent> {
1432        std::mem::take(&mut *self.pending.borrow_mut())
1433    }
1434
1435    fn record(
1436        &self,
1437        entity: &str,
1438        row_id: &str,
1439        kind: pylon_sync::ChangeKind,
1440        data: Option<&serde_json::Value>,
1441    ) {
1442        self.pending.borrow_mut().push(pylon_sync::ChangeEvent {
1443            seq: 0, // assigned by ChangeLog::append after commit
1444            entity: entity.to_string(),
1445            row_id: row_id.to_string(),
1446            kind,
1447            data: data.cloned(),
1448            timestamp: String::new(),
1449        });
1450    }
1451}
1452
1453// SAFETY: see the contract on TxStore above.
1454unsafe impl<'a> Sync for TxStore<'a> {}
1455unsafe impl<'a> Send for TxStore<'a> {}
1456
1457impl<'a> DataStore for TxStore<'a> {
1458    fn manifest(&self) -> &pylon_kernel::AppManifest {
1459        self.runtime.manifest()
1460    }
1461
1462    fn insert(&self, entity: &str, data: &serde_json::Value) -> Result<String, DataError> {
1463        let id = self
1464            .runtime
1465            .insert_with_conn(self.conn, entity, data)
1466            .map_err(into_data_error)?;
1467        // Buffer the event. If the outer mutation rolls back, the buffer
1468        // is dropped instead of flushed, so sync subscribers never see a
1469        // row that doesn't exist.
1470        self.record(entity, &id, pylon_sync::ChangeKind::Insert, Some(data));
1471        Ok(id)
1472    }
1473
1474    fn get_by_id(&self, entity: &str, id: &str) -> Result<Option<serde_json::Value>, DataError> {
1475        self.runtime
1476            .get_by_id_with_conn(self.conn, entity, id)
1477            .map_err(into_data_error)
1478    }
1479
1480    fn list(&self, entity: &str) -> Result<Vec<serde_json::Value>, DataError> {
1481        self.runtime
1482            .list_with_conn(self.conn, entity)
1483            .map_err(into_data_error)
1484    }
1485
1486    fn list_after(
1487        &self,
1488        entity: &str,
1489        after: Option<&str>,
1490        limit: usize,
1491    ) -> Result<Vec<serde_json::Value>, DataError> {
1492        self.runtime
1493            .list_after_with_conn(self.conn, entity, after, limit)
1494            .map_err(into_data_error)
1495    }
1496
1497    fn update(&self, entity: &str, id: &str, data: &serde_json::Value) -> Result<bool, DataError> {
1498        let updated = self
1499            .runtime
1500            .update_with_conn(self.conn, entity, id, data)
1501            .map_err(into_data_error)?;
1502        if updated {
1503            self.record(entity, id, pylon_sync::ChangeKind::Update, Some(data));
1504        }
1505        Ok(updated)
1506    }
1507
1508    fn delete(&self, entity: &str, id: &str) -> Result<bool, DataError> {
1509        let deleted = self
1510            .runtime
1511            .delete_with_conn(self.conn, entity, id)
1512            .map_err(into_data_error)?;
1513        if deleted {
1514            self.record(entity, id, pylon_sync::ChangeKind::Delete, None);
1515        }
1516        Ok(deleted)
1517    }
1518
1519    fn lookup(
1520        &self,
1521        entity: &str,
1522        field: &str,
1523        value: &str,
1524    ) -> Result<Option<serde_json::Value>, DataError> {
1525        self.runtime
1526            .lookup_with_conn(self.conn, entity, field, value)
1527            .map_err(into_data_error)
1528    }
1529
1530    fn link(
1531        &self,
1532        entity: &str,
1533        id: &str,
1534        relation: &str,
1535        target_id: &str,
1536    ) -> Result<bool, DataError> {
1537        self.runtime
1538            .link_with_conn(self.conn, entity, id, relation, target_id)
1539            .map_err(into_data_error)
1540    }
1541
1542    fn unlink(&self, entity: &str, id: &str, relation: &str) -> Result<bool, DataError> {
1543        self.runtime
1544            .unlink_with_conn(self.conn, entity, id, relation)
1545            .map_err(into_data_error)
1546    }
1547
1548    fn query_filtered(
1549        &self,
1550        entity: &str,
1551        filter: &serde_json::Value,
1552    ) -> Result<Vec<serde_json::Value>, DataError> {
1553        self.runtime
1554            .query_filtered_with_conn(self.conn, entity, filter)
1555            .map_err(into_data_error)
1556    }
1557
1558    fn query_graph(&self, query: &serde_json::Value) -> Result<serde_json::Value, DataError> {
1559        self.runtime
1560            .query_graph_with_conn(self.conn, query)
1561            .map_err(into_data_error)
1562    }
1563
1564    fn aggregate(
1565        &self,
1566        entity: &str,
1567        spec: &serde_json::Value,
1568    ) -> Result<serde_json::Value, DataError> {
1569        // Aggregation inside a transaction uses the same runtime method.
1570        // The lookups do their own read-lock, which is fine since aggregate
1571        // is read-only.
1572        Runtime::aggregate(self.runtime, entity, spec).map_err(into_data_error)
1573    }
1574
1575    fn transact(
1576        &self,
1577        _ops: &[serde_json::Value],
1578    ) -> Result<(bool, Vec<serde_json::Value>), DataError> {
1579        // Nested transactions aren't supported from within a mutation handler.
1580        // The mutation handler IS the transaction.
1581        Err(DataError {
1582            code: "NESTED_TRANSACTION".into(),
1583            message: "ctx.db.transact() is not allowed inside a mutation handler (the handler itself is transactional)".into(),
1584        })
1585    }
1586
1587    fn search(
1588        &self,
1589        entity: &str,
1590        query: &serde_json::Value,
1591    ) -> Result<serde_json::Value, DataError> {
1592        // Search reads against the FTS shadow are read-only; route
1593        // through the runtime's main `search` impl which already
1594        // validates the entity + branches on backend. The held write
1595        // connection is fine for reads (SQLite serializes anyway).
1596        <Runtime as DataStore>::search(self.runtime, entity, query)
1597    }
1598}
1599
1600// ---------------------------------------------------------------------------
1601// PG-transaction buffering wrapper
1602// ---------------------------------------------------------------------------
1603
1604/// `DataStore` wrapper used by the Postgres mutation path. The Postgres
1605/// `PgTxStore` owns the transaction; this wrapper layers the same
1606/// "buffer change events, flush after COMMIT" guarantee that SQLite's
1607/// `TxStore` provides directly. The underlying `inner` ref lives only
1608/// for the duration of `PostgresDataStore::with_transaction`'s closure
1609/// — the lifetime tracks through.
1610struct PgBufferedTxStore<'a> {
1611    inner: &'a dyn DataStore,
1612    pending: std::sync::Mutex<Vec<pylon_sync::ChangeEvent>>,
1613}
1614
1615impl<'a> PgBufferedTxStore<'a> {
1616    fn new(inner: &'a dyn DataStore) -> Self {
1617        Self {
1618            inner,
1619            pending: std::sync::Mutex::new(Vec::new()),
1620        }
1621    }
1622
1623    fn record(
1624        &self,
1625        entity: &str,
1626        row_id: &str,
1627        kind: pylon_sync::ChangeKind,
1628        data: Option<&serde_json::Value>,
1629    ) {
1630        if let Ok(mut p) = self.pending.lock() {
1631            p.push(pylon_sync::ChangeEvent {
1632                seq: 0,
1633                entity: entity.to_string(),
1634                row_id: row_id.to_string(),
1635                kind,
1636                data: data.cloned(),
1637                timestamp: String::new(),
1638            });
1639        }
1640    }
1641
1642    fn take_pending(self) -> Vec<pylon_sync::ChangeEvent> {
1643        self.pending.into_inner().unwrap_or_default()
1644    }
1645}
1646
1647impl<'a> DataStore for PgBufferedTxStore<'a> {
1648    fn manifest(&self) -> &pylon_kernel::AppManifest {
1649        self.inner.manifest()
1650    }
1651
1652    fn insert(&self, entity: &str, data: &serde_json::Value) -> Result<String, DataError> {
1653        let id = self.inner.insert(entity, data)?;
1654        self.record(entity, &id, pylon_sync::ChangeKind::Insert, Some(data));
1655        Ok(id)
1656    }
1657
1658    fn get_by_id(&self, entity: &str, id: &str) -> Result<Option<serde_json::Value>, DataError> {
1659        self.inner.get_by_id(entity, id)
1660    }
1661
1662    fn list(&self, entity: &str) -> Result<Vec<serde_json::Value>, DataError> {
1663        self.inner.list(entity)
1664    }
1665
1666    fn list_after(
1667        &self,
1668        entity: &str,
1669        after: Option<&str>,
1670        limit: usize,
1671    ) -> Result<Vec<serde_json::Value>, DataError> {
1672        self.inner.list_after(entity, after, limit)
1673    }
1674
1675    fn update(&self, entity: &str, id: &str, data: &serde_json::Value) -> Result<bool, DataError> {
1676        let updated = self.inner.update(entity, id, data)?;
1677        if updated {
1678            self.record(entity, id, pylon_sync::ChangeKind::Update, Some(data));
1679        }
1680        Ok(updated)
1681    }
1682
1683    fn delete(&self, entity: &str, id: &str) -> Result<bool, DataError> {
1684        let deleted = self.inner.delete(entity, id)?;
1685        if deleted {
1686            self.record(entity, id, pylon_sync::ChangeKind::Delete, None);
1687        }
1688        Ok(deleted)
1689    }
1690
1691    fn lookup(
1692        &self,
1693        entity: &str,
1694        field: &str,
1695        value: &str,
1696    ) -> Result<Option<serde_json::Value>, DataError> {
1697        self.inner.lookup(entity, field, value)
1698    }
1699
1700    fn link(
1701        &self,
1702        entity: &str,
1703        id: &str,
1704        relation: &str,
1705        target_id: &str,
1706    ) -> Result<bool, DataError> {
1707        let linked = self.inner.link(entity, id, relation, target_id)?;
1708        if linked {
1709            // `link` is implemented as a typed `update` under the hood —
1710            // record an Update so subscribers see the FK-set the same way
1711            // they'd see any other column change.
1712            let data = serde_json::json!({ relation: target_id });
1713            self.record(entity, id, pylon_sync::ChangeKind::Update, Some(&data));
1714        }
1715        Ok(linked)
1716    }
1717
1718    fn unlink(&self, entity: &str, id: &str, relation: &str) -> Result<bool, DataError> {
1719        let unlinked = self.inner.unlink(entity, id, relation)?;
1720        if unlinked {
1721            let data = serde_json::json!({ relation: serde_json::Value::Null });
1722            self.record(entity, id, pylon_sync::ChangeKind::Update, Some(&data));
1723        }
1724        Ok(unlinked)
1725    }
1726
1727    fn query_filtered(
1728        &self,
1729        entity: &str,
1730        filter: &serde_json::Value,
1731    ) -> Result<Vec<serde_json::Value>, DataError> {
1732        self.inner.query_filtered(entity, filter)
1733    }
1734
1735    fn query_graph(&self, query: &serde_json::Value) -> Result<serde_json::Value, DataError> {
1736        self.inner.query_graph(query)
1737    }
1738
1739    fn aggregate(
1740        &self,
1741        entity: &str,
1742        spec: &serde_json::Value,
1743    ) -> Result<serde_json::Value, DataError> {
1744        self.inner.aggregate(entity, spec)
1745    }
1746
1747    fn transact(
1748        &self,
1749        ops: &[serde_json::Value],
1750    ) -> Result<(bool, Vec<serde_json::Value>), DataError> {
1751        // Forward to the inner PgTxStore, which already returns
1752        // NESTED_TRANSACTION. Keeping the forward (instead of erroring
1753        // here) means the wrapper stays a faithful pass-through and any
1754        // future change to the inner policy applies uniformly.
1755        self.inner.transact(ops)
1756    }
1757
1758    fn search(
1759        &self,
1760        entity: &str,
1761        query: &serde_json::Value,
1762    ) -> Result<serde_json::Value, DataError> {
1763        // Forward to inner — `PgTxStore::search` (default impl)
1764        // currently returns NOT_SUPPORTED for in-tx search, which is
1765        // the right answer: PG search uses a separate connection
1766        // pool today and would deadlock on the in-handler tx if we
1767        // tried to fan out from the same client.
1768        self.inner.search(entity, query)
1769    }
1770}
1771
1772// ---------------------------------------------------------------------------
1773// Adapter: FnRunner → FnOps
1774// ---------------------------------------------------------------------------
1775
1776use pylon_functions::protocol::{AuthInfo as FnAuth, FnType};
1777use pylon_functions::registry::{FnDef, FnRegistry};
1778use pylon_functions::runner::{FnCallError, FnRunner};
1779use pylon_functions::trace::FnTrace;
1780
1781/// Adapter that implements [`FnOps`] by delegating to a [`FnRunner`].
1782///
1783/// Holds an `Arc<Runtime>` so function handlers get a [`DataStore`] to
1784/// operate against.
1785pub struct FnOpsImpl {
1786    pub runner: Arc<FnRunner>,
1787    pub registry: Arc<FnRegistry>,
1788    pub runtime: Arc<Runtime>,
1789    /// Per-function rate limiter, keyed on `"<fn_name>::<identity>"`.
1790    /// Limits are uniform; per-fn overrides can be added later via FnDef
1791    /// metadata once the TS define API surfaces them.
1792    pub fn_rate_limiter: Arc<crate::rate_limit::RateLimiter>,
1793    /// Sync change log for broadcasting `ctx.db.insert/update/delete` ops
1794    /// that happen inside a function handler. Without this, mutations via
1795    /// functions silently bypass sync — WS subscribers see nothing until
1796    /// they manually refetch. Flushed post-COMMIT so rollbacks don't emit
1797    /// phantom events.
1798    pub change_log: Arc<pylon_sync::ChangeLog>,
1799    /// Where to broadcast change events after a function mutation commits.
1800    pub notifier: Arc<dyn pylon_router::ChangeNotifier>,
1801    /// Job queue for flushing schedules buffered during a mutation
1802    /// handler. The schedule hook itself owns its own clone for the
1803    /// non-mutation enqueue path; this clone is what the
1804    /// post-COMMIT flush uses.
1805    pub job_queue: Arc<crate::jobs::JobQueue>,
1806}
1807
1808impl FnOpsImpl {
1809    /// Drain a per-mutation schedule buffer and enqueue each entry on
1810    /// the job queue. Called only after COMMIT succeeds — a rolled-back
1811    /// handler's buffer is dropped without flushing.
1812    fn flush_pending_schedules(&self, pending: Vec<PendingSchedule>) {
1813        for sched in pending {
1814            let delay_secs = match (sched.delay_ms, sched.run_at) {
1815                (Some(ms), _) => ms / 1000,
1816                (None, Some(ts)) => {
1817                    let now = std::time::SystemTime::now()
1818                        .duration_since(std::time::UNIX_EPOCH)
1819                        .unwrap_or_default()
1820                        .as_millis() as u64;
1821                    if ts > now { (ts - now) / 1000 } else { 0 }
1822                }
1823                _ => 0,
1824            };
1825            if let Err(e) = self.job_queue.try_enqueue_with_options(
1826                &sched.fn_name,
1827                sched.args,
1828                crate::jobs::Priority::Normal,
1829                delay_secs,
1830                3,
1831                "functions",
1832            ) {
1833                // Schedule was already acked OK to the TS handler — the
1834                // mutation has committed. Best we can do now is log
1835                // loudly so an operator notices the dropped enqueue.
1836                tracing::warn!(
1837                    "[functions] post-COMMIT enqueue failed for \"{}\": {e}",
1838                    sched.fn_name
1839                );
1840            }
1841        }
1842    }
1843}
1844
1845impl pylon_router::FnOps for FnOpsImpl {
1846    fn get_fn(&self, name: &str) -> Option<FnDef> {
1847        self.registry.get(name)
1848    }
1849
1850    fn list_fns(&self) -> Vec<FnDef> {
1851        self.registry.list()
1852    }
1853
1854    fn call(
1855        &self,
1856        fn_name: &str,
1857        args: serde_json::Value,
1858        auth: FnAuth,
1859        on_stream: Option<Box<dyn FnMut(&str) + Send>>,
1860        request: Option<pylon_functions::protocol::RequestInfo>,
1861    ) -> Result<(serde_json::Value, FnTrace), FnCallError> {
1862        let def = self.registry.get(fn_name).ok_or_else(|| FnCallError {
1863            code: "FN_NOT_FOUND".into(),
1864            message: format!("Function \"{fn_name}\" is not registered"),
1865        })?;
1866
1867        match def.fn_type {
1868            FnType::Mutation => {
1869                // Postgres backend: route through PostgresDataStore::with_transaction
1870                // so the TS handler runs against a held PG transaction. Reads
1871                // see the handler's own pending writes; an error rolls
1872                // everything back atomically. Change events are buffered in a
1873                // `PgBufferedTxStore` wrapper and flushed only after COMMIT —
1874                // mirrors the SQLite `TxStore` behavior.
1875                if self.runtime.is_postgres() {
1876                    let pg_backend = self.runtime.pg_backend().ok_or_else(|| FnCallError {
1877                        code: "PG_BACKEND_MISSING".into(),
1878                        message:
1879                            "Postgres backend reported is_postgres=true but pg_backend() returned None"
1880                                .into(),
1881                    })?;
1882
1883                    // The closure has to own its own state (Box the stream
1884                    // callback so we can move it inside). Capture
1885                    // `request`/`auth`/`args` by move; they aren't needed
1886                    // again outside the closure.
1887                    let runner = self.runner.clone();
1888                    let fn_type = def.fn_type;
1889                    let fn_name_owned = fn_name.to_string();
1890
1891                    // Push the schedule buffer onto the thread-local for
1892                    // the duration of the handler. Drain after COMMIT
1893                    // succeeds; on rollback the buffer is dropped.
1894                    let sched_guard = ScheduleBufferGuard::enter();
1895                    // Mark "we're in a mutation tx" so the nested-call
1896                    // hook rejects recursive mutation calls instead of
1897                    // deadlocking on the connection mutex.
1898                    let _depth_guard = MutationDepthGuard::enter();
1899
1900                    // Install the CRDT hook so PgTxStore projects
1901                    // `ctx.db.X` writes on `crdt: true` entities through
1902                    // PgLoroStore + persists the snapshot in the same
1903                    // tx. Without this, codex-flagged: TS mutation
1904                    // writes on CRDT entities desync from the sidecar.
1905                    let crdt_hook: std::sync::Arc<
1906                        dyn pylon_storage::pg_tx_store::PgCrdtHook,
1907                    > = std::sync::Arc::new(crate::pg_loro_store::PgCrdtHookImpl {
1908                        crdt: std::sync::Arc::clone(&pg_backend.crdt),
1909                        manifest: std::sync::Arc::new(self.runtime.manifest().clone()),
1910                    });
1911
1912                    let pg = &pg_backend.store;
1913                    let tx_result: Result<
1914                        (serde_json::Value, FnTrace, Vec<pylon_sync::ChangeEvent>),
1915                        FnCallError,
1916                    > = pg.with_transaction_crdt(crdt_hook, move |inner_store: &dyn DataStore| {
1917                        let buffered = PgBufferedTxStore::new(inner_store);
1918                        let (value, trace) = runner.call(
1919                            &buffered,
1920                            &fn_name_owned,
1921                            fn_type,
1922                            args,
1923                            auth,
1924                            on_stream,
1925                            request,
1926                        )?;
1927                        Ok((value, trace, buffered.take_pending()))
1928                    });
1929
1930                    return match tx_result {
1931                        Ok((value, trace, pending)) => {
1932                            // Mirror the SQLite path: append to the change
1933                            // log first (so /api/sync/pull tail callers see
1934                            // it), then notify WS/SSE subscribers.
1935                            for ev in pending {
1936                                let seq = self.change_log.append(
1937                                    &ev.entity,
1938                                    &ev.row_id,
1939                                    ev.kind.clone(),
1940                                    ev.data.clone(),
1941                                );
1942                                let event = pylon_sync::ChangeEvent { seq, ..ev };
1943                                self.notifier.notify(&event);
1944                            }
1945                            // Flush scheduled jobs after the commit lands.
1946                            // On rollback the early `Err(e)` arm below
1947                            // skips this and the buffer is dropped.
1948                            self.flush_pending_schedules(sched_guard.take());
1949                            drop(sched_guard);
1950                            Ok((value, trace))
1951                        }
1952                        Err(e) => {
1953                            drop(sched_guard);
1954                            Err(e)
1955                        }
1956                    };
1957                }
1958                // Hold the write connection for the entire handler duration.
1959                // This keeps the BEGIN/COMMIT span free of interleaving from
1960                // other writers (who would otherwise become part of the
1961                // transaction because SQLite tracks it on the connection).
1962                //
1963                // Inside the handler, every `ctx.db` call routes through
1964                // TxStore, which uses this same held connection — so no
1965                // re-locking, no deadlock, no interleaving.
1966                let conn_guard = self.runtime.lock_conn_pub().map_err(|e| FnCallError {
1967                    code: e.code,
1968                    message: e.message,
1969                })?;
1970
1971                if let Err(e) = conn_guard.execute("BEGIN", []) {
1972                    return Err(FnCallError {
1973                        code: "BEGIN_FAILED".into(),
1974                        message: format!("Failed to start transaction: {e}"),
1975                    });
1976                }
1977
1978                // Same schedule buffering as the PG path — `runAfter`
1979                // calls inside this handler defer until COMMIT succeeds.
1980                let sched_guard = ScheduleBufferGuard::enter();
1981                // Same nested-mutation deadlock guard as the PG path.
1982                let _depth_guard = MutationDepthGuard::enter();
1983
1984                let tx_store = TxStore::new(&self.runtime, &conn_guard);
1985                let result = self.runner.call(
1986                    &tx_store,
1987                    fn_name,
1988                    def.fn_type,
1989                    args,
1990                    auth,
1991                    on_stream,
1992                    request,
1993                );
1994
1995                // Surface commit/rollback errors. A swallowed COMMIT failure
1996                // is the worst possible outcome: the caller sees success but
1997                // the data isn't durable. A swallowed ROLLBACK failure leaves
1998                // the connection in an unknown txn state for the next caller.
1999                let result = match result {
2000                    Ok(value) => match conn_guard.execute("COMMIT", []) {
2001                        Ok(_) => {
2002                            // Flush buffered change events NOW — after the
2003                            // commit durably lands but before we return
2004                            // success. Ordering matters: append to the log
2005                            // first (so /api/sync/pull callers that race
2006                            // with this broadcast see the row in the tail),
2007                            // then notify WS/SSE subscribers. `seq` on each
2008                            // pending event starts at 0; append assigns
2009                            // the real seq.
2010                            for ev in tx_store.take_pending() {
2011                                let seq = self.change_log.append(
2012                                    &ev.entity,
2013                                    &ev.row_id,
2014                                    ev.kind.clone(),
2015                                    ev.data.clone(),
2016                                );
2017                                let event = pylon_sync::ChangeEvent { seq, ..ev };
2018                                self.notifier.notify(&event);
2019                            }
2020                            // Same flush as the PG path — durable commit,
2021                            // then flush schedules. Drop the guard
2022                            // explicitly so the thread-local clears
2023                            // before the result returns.
2024                            self.flush_pending_schedules(sched_guard.take());
2025                            drop(sched_guard);
2026                            Ok(value)
2027                        }
2028                        Err(commit_err) => {
2029                            // Best-effort cleanup. If ROLLBACK also fails the
2030                            // connection is in a bad state — at minimum the
2031                            // operator sees both failures in the log.
2032                            if let Err(rollback_err) = conn_guard.execute("ROLLBACK", []) {
2033                                tracing::warn!(
2034                                    "[functions] ROLLBACK after COMMIT failure also failed: {rollback_err}"
2035                                );
2036                            }
2037                            Err(FnCallError {
2038                                code: "COMMIT_FAILED".into(),
2039                                message: format!(
2040                                    "Function \"{fn_name}\" succeeded but COMMIT failed: {commit_err}"
2041                                ),
2042                            })
2043                        }
2044                    },
2045                    Err(handler_err) => {
2046                        if let Err(rollback_err) = conn_guard.execute("ROLLBACK", []) {
2047                            // Don't shadow the handler error — log the
2048                            // rollback failure separately.
2049                            tracing::warn!(
2050                                "[functions] ROLLBACK after handler error failed: {rollback_err}"
2051                            );
2052                        }
2053                        Err(handler_err)
2054                    }
2055                };
2056                // conn_guard drops here, releasing the lock.
2057                result
2058            }
2059            _ => self.runner.call(
2060                &*self.runtime,
2061                fn_name,
2062                def.fn_type,
2063                args,
2064                auth,
2065                on_stream,
2066                request,
2067            ),
2068        }
2069    }
2070
2071    fn recent_traces(&self, limit: usize) -> Vec<FnTrace> {
2072        self.runner.trace_log.recent(limit)
2073    }
2074
2075    fn check_rate_limit(&self, fn_name: &str, identity: &str) -> Result<(), u64> {
2076        let key = format!("{fn_name}::{identity}");
2077        self.fn_rate_limiter.check(&key)
2078    }
2079}
2080
2081/// Spawn the Bun function runtime if a `functions/` directory exists.
2082///
2083/// Returns `Some(FnOpsImpl)` if successful, `None` if no functions directory
2084/// or if Bun is not installed. Errors during startup print to stderr and
2085/// return `None` to keep the server running.
2086/// Resolve the path to the TypeScript function runtime script.
2087///
2088/// Searches in order:
2089/// 1. `$PYLON_FUNCTIONS_RUNTIME` environment variable (if set and file exists)
2090/// 2. `./node_modules/@pylon/functions/src/runtime.ts` (npm-installed)
2091/// 3. `./node_modules/@pylon/functions/dist/runtime.js` (built)
2092/// 4. `~/.pylon/runtime.ts` (user install)
2093/// 5. `packages/functions/src/runtime.ts` (dev monorepo)
2094///
2095/// Returns `None` if none exist.
2096pub fn find_functions_runtime() -> Option<String> {
2097    if let Ok(env_path) = std::env::var("PYLON_FUNCTIONS_RUNTIME") {
2098        if std::path::Path::new(&env_path).exists() {
2099            return Some(env_path);
2100        }
2101    }
2102
2103    // Walk parent directories like Node.js resolution does, so running
2104    // `pylon dev` from an example sub-directory still finds the
2105    // hoisted workspace package at the repo root. Without this, bun/npm
2106    // workspace users see "TypeScript function runtime is not configured"
2107    // and think the server is broken when it's just a CWD issue.
2108    let cwd = std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from("."));
2109    let home = std::env::var("HOME").unwrap_or_else(|_| ".".into());
2110    let relative_candidates = [
2111        "node_modules/@pylonsync/functions/src/runtime.ts",
2112        "node_modules/@pylonsync/functions/dist/runtime.js",
2113        // Legacy name — keeps upgrades from the pre-rename era working.
2114        "node_modules/@pylon/functions/src/runtime.ts",
2115        "node_modules/@pylon/functions/dist/runtime.js",
2116        // Monorepo dev: source tree at the workspace root.
2117        "packages/functions/src/runtime.ts",
2118    ];
2119
2120    let mut dir: Option<&std::path::Path> = Some(cwd.as_path());
2121    while let Some(current) = dir {
2122        for rel in &relative_candidates {
2123            let candidate = current.join(rel);
2124            if candidate.exists() {
2125                return candidate.to_str().map(|s| s.to_string());
2126            }
2127        }
2128        dir = current.parent();
2129    }
2130
2131    // Final fallback: user-wide install under ~/.pylon.
2132    let user_path = format!("{home}/.pylon/runtime.ts");
2133    if std::path::Path::new(&user_path).exists() {
2134        return Some(user_path);
2135    }
2136    None
2137}
2138
2139pub fn try_spawn_functions(
2140    runtime: Arc<Runtime>,
2141    job_queue: Arc<crate::jobs::JobQueue>,
2142    fn_rate_limiter: Arc<crate::rate_limit::RateLimiter>,
2143    change_log: Arc<pylon_sync::ChangeLog>,
2144    notifier: Arc<dyn pylon_router::ChangeNotifier>,
2145) -> Option<Arc<FnOpsImpl>> {
2146    let fn_dir = std::env::var("PYLON_FUNCTIONS_DIR").unwrap_or_else(|_| "functions".into());
2147    if !std::path::Path::new(&fn_dir).exists() {
2148        return None;
2149    }
2150
2151    let runtime_script = match find_functions_runtime() {
2152        Some(p) => p,
2153        None => {
2154            tracing::warn!(
2155                "[functions] No TypeScript runtime script found. TypeScript functions will be unavailable."
2156            );
2157            tracing::warn!(
2158                "[functions] Tried: $PYLON_FUNCTIONS_RUNTIME, node_modules/@pylon/functions/src/runtime.ts, ~/.pylon/runtime.ts, packages/functions/src/runtime.ts"
2159            );
2160            return None;
2161        }
2162    };
2163
2164    let runner = Arc::new(FnRunner::new(1000));
2165
2166    // start() now performs the handshake itself and returns the function
2167    // definitions, so there's no separate handshake step. On any failure the
2168    // child has already been killed.
2169    let defs = match runner.start("bun", &["run", &runtime_script, &fn_dir]) {
2170        Ok(defs) => defs,
2171        Err(e) => {
2172            tracing::warn!("[functions] Failed to start Bun runtime: {e}");
2173            tracing::warn!(
2174                "[functions] Install Bun from https://bun.sh — TypeScript functions will be unavailable."
2175            );
2176            return None;
2177        }
2178    };
2179
2180    // Hold a separate handle on the job queue for registering function
2181    // job handlers below, since the schedule-hook closure consumes its
2182    // own copy.
2183    let job_queue_for_handlers = Arc::clone(&job_queue);
2184
2185    // Wire scheduler requests from functions into the job queue. Use the
2186    // Result-returning variant so a persist failure surfaces as a TS-side
2187    // SCHEDULE_FAILED error instead of `{scheduled:true, id:""}`.
2188    //
2189    // Transaction-bound semantics: when this hook is invoked from inside
2190    // a mutation handler, the per-call `MUTATION_SCHEDULE_BUFFER`
2191    // thread-local is set. Schedules buffer there and drain post-COMMIT
2192    // (so a rolled-back mutation can't leave behind scheduled work).
2193    // Outside a mutation (queries, actions, top-level non-mutation
2194    // jobs), the buffer is `None` and we enqueue immediately — matching
2195    // the historical contract for those code paths.
2196    runner.set_schedule_hook(Box::new(move |fn_name, args, delay_ms, run_at| {
2197        // Check the thread-local first. If we're inside a mutation, the
2198        // buffer is `Some` and we defer.
2199        let buffered = MUTATION_SCHEDULE_BUFFER.with(|cell| {
2200            let slot = cell.borrow();
2201            slot.as_ref().map(|b| {
2202                b.borrow_mut().push(PendingSchedule {
2203                    fn_name: fn_name.to_string(),
2204                    args: args.clone(),
2205                    delay_ms,
2206                    run_at,
2207                });
2208            }).is_some()
2209        });
2210        if buffered {
2211            // No real job-id yet — the actual enqueue happens after
2212            // COMMIT. Returning a synthetic id keeps the TS contract
2213            // (`{scheduled:true,id:string}`) intact; a mutation that
2214            // rolls back will discard the buffer and the id was never
2215            // observable to anyone outside the handler anyway.
2216            return Ok(format!("pending:{fn_name}"));
2217        }
2218
2219        let delay_secs = match (delay_ms, run_at) {
2220            (Some(ms), _) => ms / 1000,
2221            (None, Some(ts)) => {
2222                let now = std::time::SystemTime::now()
2223                    .duration_since(std::time::UNIX_EPOCH)
2224                    .unwrap_or_default()
2225                    .as_millis() as u64;
2226                if ts > now {
2227                    (ts - now) / 1000
2228                } else {
2229                    0
2230                }
2231            }
2232            _ => 0,
2233        };
2234        job_queue.try_enqueue_with_options(
2235            fn_name,
2236            args,
2237            crate::jobs::Priority::Normal,
2238            delay_secs,
2239            3,
2240            "functions",
2241        )
2242    }));
2243
2244    let registry = Arc::new(FnRegistry::new());
2245    let count = defs.len();
2246    registry.replace_all(defs);
2247    tracing::warn!("[functions] Loaded {count} function(s) from {fn_dir}");
2248
2249    let ops = Arc::new(FnOpsImpl {
2250        runner,
2251        registry,
2252        runtime,
2253        fn_rate_limiter,
2254        change_log,
2255        notifier,
2256        job_queue: Arc::clone(&job_queue_for_handlers),
2257    });
2258
2259    install_nested_call_hook(&ops);
2260    register_function_job_handlers(&ops, &job_queue_for_handlers);
2261    spawn_runtime_supervisor(Arc::clone(&ops));
2262    Some(ops)
2263}
2264
2265/// Bridge scheduled function calls (via `ctx.scheduler.runAfter` or
2266/// `runAt`) to the function runner. Without this, the schedule hook
2267/// enqueues a job whose `name` is the function name — but no handler
2268/// is registered for it, so the worker fails with "No handler
2269/// registered" and the scheduled callback never runs.
2270///
2271/// Registers one handler per loaded function. Each handler invokes
2272/// `FnOpsImpl::call` with a system auth context (no user_id, not
2273/// admin) so the called function runs with the same privileges a
2274/// trusted-server-side caller would have. The args come from the
2275/// job payload, which the schedule hook copies verbatim from the
2276/// `runAfter(ms, fn, args)` invocation.
2277fn register_function_job_handlers(ops: &Arc<FnOpsImpl>, job_queue: &Arc<crate::jobs::JobQueue>) {
2278    use pylon_router::FnOps as _;
2279
2280    let fn_names: Vec<String> = ops.registry.list().into_iter().map(|d| d.name).collect();
2281
2282    for name in fn_names {
2283        let weak = Arc::downgrade(ops);
2284        let fn_name = name.clone();
2285        job_queue.register(
2286            &name,
2287            Arc::new(move |job: &crate::jobs::Job| {
2288                let ops = match weak.upgrade() {
2289                    Some(o) => o,
2290                    None => {
2291                        return crate::jobs::JobResult::Failure(
2292                            "RUNTIME_GONE: function ops dropped".into(),
2293                        )
2294                    }
2295                };
2296                let auth = FnAuth {
2297                    user_id: None,
2298                    is_admin: false,
2299                    tenant_id: None,
2300                };
2301                match ops.call(&fn_name, job.payload.clone(), auth, None, None) {
2302                    Ok(_) => crate::jobs::JobResult::Success,
2303                    Err(e) => crate::jobs::JobResult::Retry(format!("{}: {}", e.code, e.message)),
2304                }
2305            }),
2306        );
2307    }
2308}
2309
2310/// Route nested `RunFn` calls (action → query/mutation) through a
2311/// transactional wrapper so nested mutations get their own BEGIN/COMMIT.
2312///
2313/// Uses a `Weak<FnOpsImpl>` to avoid keeping the ops struct alive forever
2314/// through a cycle (hook stored on FnRunner ← held by FnOpsImpl). When the
2315/// ops struct is dropped the hook becomes a no-op error.
2316fn install_nested_call_hook(ops: &Arc<FnOpsImpl>) {
2317    use pylon_functions::protocol::{AuthInfo, FnType};
2318
2319    let weak = Arc::downgrade(ops);
2320    ops.runner.set_nested_call_hook(Box::new(
2321        move |fn_name: &str,
2322              fn_type: FnType,
2323              args: serde_json::Value,
2324              auth: AuthInfo|
2325              -> Result<serde_json::Value, (String, String)> {
2326            let ops = match weak.upgrade() {
2327                Some(o) => o,
2328                None => {
2329                    return Err((
2330                        "RUNTIME_GONE".into(),
2331                        "pylon runtime is shutting down".into(),
2332                    ))
2333                }
2334            };
2335
2336            match fn_type {
2337                FnType::Mutation => {
2338                    // Reject nested mutations: both backends acquire a
2339                    // single (non-reentrant) connection mutex per
2340                    // mutation, so a TS handler that calls runMutation
2341                    // from inside another mutation would block forever.
2342                    // Surface a clear NESTED_MUTATION error instead of
2343                    // hanging — callers should restructure to call the
2344                    // shared logic as a function (not a separate
2345                    // mutation) or call from an action.
2346                    if in_mutation_tx() {
2347                        return Err((
2348                            "NESTED_MUTATION".into(),
2349                            format!(
2350                                "ctx.runMutation(\"{fn_name}\") is not allowed from inside \
2351                                 another mutation handler — the mutation handler IS the \
2352                                 transaction, and the connection mutex is non-reentrant. \
2353                                 Restructure the shared logic into a regular function (not \
2354                                 a registered mutation), or call from an action handler."
2355                            ),
2356                        ));
2357                    }
2358
2359                    // Postgres backend: route through the PG with_transaction
2360                    // closure, mirroring the top-level mutation path. Without
2361                    // this, action -> ctx.runMutation(...) errors with
2362                    // NOT_SQLITE_BACKEND on PG even though the top-level path
2363                    // works fine.
2364                    if ops.runtime.is_postgres() {
2365                        let pg_backend = ops.runtime.pg_backend().ok_or_else(|| {
2366                            (
2367                                "PG_BACKEND_MISSING".into(),
2368                                "Postgres backend reported is_postgres=true but pg_backend() returned None".into(),
2369                            )
2370                        })?;
2371                        let pg = &pg_backend.store;
2372                        let runner = ops.runner.clone();
2373                        let fn_name_owned = fn_name.to_string();
2374                        let sched_guard = ScheduleBufferGuard::enter();
2375                        let _depth_guard = MutationDepthGuard::enter();
2376                        // Same CRDT hook as the top-level mutation
2377                        // path so action -> runMutation on a crdt:true
2378                        // entity also maintains the sidecar.
2379                        let crdt_hook: std::sync::Arc<
2380                            dyn pylon_storage::pg_tx_store::PgCrdtHook,
2381                        > = std::sync::Arc::new(crate::pg_loro_store::PgCrdtHookImpl {
2382                            crdt: std::sync::Arc::clone(&pg_backend.crdt),
2383                            manifest: std::sync::Arc::new(ops.runtime.manifest().clone()),
2384                        });
2385                        let tx_result: Result<
2386                            (serde_json::Value, Vec<pylon_sync::ChangeEvent>),
2387                            FnCallError,
2388                        > = pg.with_transaction_crdt(crdt_hook, move |inner_store: &dyn DataStore| {
2389                            let buffered = PgBufferedTxStore::new(inner_store);
2390                            let (value, _trace) = runner.call_inner(
2391                                &buffered,
2392                                &fn_name_owned,
2393                                fn_type,
2394                                args,
2395                                auth,
2396                                None,
2397                                None,
2398                            )?;
2399                            Ok((value, buffered.take_pending()))
2400                        });
2401                        return match tx_result {
2402                            Ok((value, pending)) => {
2403                                for ev in pending {
2404                                    let seq = ops.change_log.append(
2405                                        &ev.entity,
2406                                        &ev.row_id,
2407                                        ev.kind.clone(),
2408                                        ev.data.clone(),
2409                                    );
2410                                    let event = pylon_sync::ChangeEvent { seq, ..ev };
2411                                    ops.notifier.notify(&event);
2412                                }
2413                                ops.flush_pending_schedules(sched_guard.take());
2414                                drop(sched_guard);
2415                                Ok(value)
2416                            }
2417                            Err(e) => {
2418                                drop(sched_guard);
2419                                Err((e.code, e.message))
2420                            }
2421                        };
2422                    }
2423
2424                    // Wrap the nested mutation in its own write-conn + BEGIN
2425                    // + COMMIT, matching the top-level mutation contract.
2426                    let conn_guard = ops
2427                        .runtime
2428                        .lock_conn_pub()
2429                        .map_err(|e| (e.code, e.message))?;
2430                    if let Err(e) = conn_guard.execute("BEGIN", []) {
2431                        return Err(("BEGIN_FAILED".into(), e.to_string()));
2432                    }
2433                    let sched_guard = ScheduleBufferGuard::enter();
2434                    let _depth_guard = MutationDepthGuard::enter();
2435                    let tx_store = TxStore::new(&ops.runtime, &conn_guard);
2436                    // Re-enter protocol without acquiring io_lock — we're
2437                    // already inside the outer call_inner which holds it.
2438                    // Nested calls never get HTTP request metadata — that's
2439                    // only meaningful for the top-level webhook invocation.
2440                    let result = ops
2441                        .runner
2442                        .call_inner(&tx_store, fn_name, fn_type, args, auth, None, None);
2443                    match result {
2444                        Ok((value, _trace)) => {
2445                            if let Err(e) = conn_guard.execute("COMMIT", []) {
2446                                let _ = conn_guard.execute("ROLLBACK", []);
2447                                return Err(("COMMIT_FAILED".into(), e.to_string()));
2448                            }
2449                            // Flush change events after COMMIT so nested
2450                            // mutations (action → runMutation(...)) broadcast
2451                            // the same way top-level mutations do. Without
2452                            // this, every write an action emits is invisible
2453                            // to sync subscribers until the NEXT top-level
2454                            // mutation lands — streaming UIs stay empty.
2455                            for ev in tx_store.take_pending() {
2456                                let seq = ops.change_log.append(
2457                                    &ev.entity,
2458                                    &ev.row_id,
2459                                    ev.kind.clone(),
2460                                    ev.data.clone(),
2461                                );
2462                                let event = pylon_sync::ChangeEvent { seq, ..ev };
2463                                ops.notifier.notify(&event);
2464                            }
2465                            ops.flush_pending_schedules(sched_guard.take());
2466                            drop(sched_guard);
2467                            Ok(value)
2468                        }
2469                        Err(e) => {
2470                            let _ = conn_guard.execute("ROLLBACK", []);
2471                            drop(sched_guard);
2472                            Err((e.code, e.message))
2473                        }
2474                    }
2475                }
2476                _ => {
2477                    // Queries + actions: no transaction wrap needed. Just
2478                    // re-enter protocol via the same store (runtime).
2479                    // Nested: no HTTP request propagated (see above).
2480                    let result = ops.runner.call_inner(
2481                        &*ops.runtime,
2482                        fn_name,
2483                        fn_type,
2484                        args,
2485                        auth,
2486                        None,
2487                        None,
2488                    );
2489                    result.map(|(v, _)| v).map_err(|e| (e.code, e.message))
2490                }
2491            }
2492        },
2493    ));
2494}
2495
2496/// Background watchdog that restarts the Bun runtime if it dies (crashed,
2497/// killed by the call timeout path, OOM, etc.). Exponential backoff: 1s, 2s,
2498/// 4s, ... capped at 30s. Resets to 1s after a successful respawn.
2499///
2500/// We don't try to "give up" — if Bun keeps crashing the supervisor keeps
2501/// trying with the capped delay. The operator sees repeated WARN logs and
2502/// can investigate. Better than silently leaving functions disabled forever.
2503fn spawn_runtime_supervisor(ops: Arc<FnOpsImpl>) {
2504    use std::time::Duration;
2505
2506    std::thread::Builder::new()
2507        .name("pylon-fn-supervisor".into())
2508        .spawn(move || {
2509            let mut backoff = Duration::from_secs(1);
2510            let max_backoff = Duration::from_secs(30);
2511            loop {
2512                std::thread::sleep(Duration::from_secs(2));
2513                if ops.runner.is_alive() {
2514                    backoff = Duration::from_secs(1);
2515                    continue;
2516                }
2517                tracing::warn!(
2518                    "[functions] Bun runtime is not alive — respawning after {:?}",
2519                    backoff
2520                );
2521                std::thread::sleep(backoff);
2522                match ops.runner.respawn() {
2523                    Ok(defs) => {
2524                        let count = defs.len();
2525                        // Replace, not merge — deleted functions must stop
2526                        // being callable. register_all() alone leaves stale
2527                        // entries from the previous process generation.
2528                        ops.registry.replace_all(defs);
2529                        tracing::warn!("[functions] Respawned Bun runtime ({count} fn(s))");
2530                        backoff = Duration::from_secs(1);
2531                    }
2532                    Err(e) => {
2533                        tracing::warn!("[functions] Respawn failed: {e}");
2534                        // Persistent Bun-runtime failures are the kind of
2535                        // operator signal that belongs in error telemetry
2536                        // too. Include enough context to triage repeated
2537                        // events: current backoff (so operators can see
2538                        // how long failures have been compounding) and the
2539                        // component name.
2540                        let backoff_str = format!("{}", backoff.as_secs());
2541                        pylon_observability::report_error(&pylon_observability::ErrorEvent {
2542                            level: pylon_observability::ErrorLevel::Error,
2543                            code: "FN_RESPAWN_FAILED",
2544                            message: &e,
2545                            context: &[
2546                                ("component", "bun-runtime-supervisor"),
2547                                ("backoff_secs", &backoff_str),
2548                            ],
2549                        });
2550                        backoff = (backoff * 2).min(max_backoff);
2551                    }
2552                }
2553            }
2554        })
2555        .expect("failed to spawn function runtime supervisor");
2556}