Skip to main content

pylon_runtime/
datastore.rs

1//! Implements the platform-agnostic [`DataStore`] trait for [`Runtime`].
2//!
3//! This bridges the concrete SQLite-backed Runtime to the abstract trait
4//! used by the router crate, enabling the same routing logic to run on
5//! self-hosted servers and Cloudflare Workers alike.
6
7use pylon_http::{DataError, DataStore};
8
9use crate::Runtime;
10
11// ---------------------------------------------------------------------------
12// In-flight mutation schedule buffering
13// ---------------------------------------------------------------------------
14
15/// A scheduled function call captured during a mutation handler. Held in
16/// the per-mutation pending list until the surrounding transaction
17/// commits — at which point we drain and enqueue. On rollback the list
18/// is dropped without enqueuing, so a failed mutation can't leave behind
19/// scheduled side-effects (the docs claim this; before this buffer the
20/// claim was false).
21#[derive(Debug, Clone)]
22pub(crate) struct PendingSchedule {
23    pub fn_name: String,
24    pub args: serde_json::Value,
25    pub delay_ms: Option<u64>,
26    pub run_at: Option<u64>,
27}
28
29thread_local! {
30    /// Set by the mutation entry point (top-level + nested) for the
31    /// duration of a TS handler call. The schedule hook checks this
32    /// thread-local: when `Some`, scheduling buffers into the inner
33    /// `Vec`; when `None`, the hook enqueues immediately (the
34    /// historical, non-mutation behavior). The Bun stdio loop is
35    /// single-threaded per call (the runner holds `io_lock` for the
36    /// whole call duration), so a thread-local is the right scoping
37    /// primitive — no cross-thread leakage.
38    pub(crate) static MUTATION_SCHEDULE_BUFFER: std::cell::RefCell<Option<std::rc::Rc<std::cell::RefCell<Vec<PendingSchedule>>>>>
39        = const { std::cell::RefCell::new(None) };
40}
41
42/// RAII guard that pushes a schedule buffer onto the thread-local for
43/// the duration of a mutation handler call, then restores the previous
44/// value (which is almost always `None`, but supports nested mutation
45/// handlers stacking buffers correctly).
46pub(crate) struct ScheduleBufferGuard {
47    previous: Option<std::rc::Rc<std::cell::RefCell<Vec<PendingSchedule>>>>,
48    current: std::rc::Rc<std::cell::RefCell<Vec<PendingSchedule>>>,
49}
50
51impl ScheduleBufferGuard {
52    pub(crate) fn enter() -> Self {
53        let current = std::rc::Rc::new(std::cell::RefCell::new(Vec::new()));
54        let previous = MUTATION_SCHEDULE_BUFFER.with(|cell| {
55            let mut slot = cell.borrow_mut();
56            let old = slot.take();
57            *slot = Some(current.clone());
58            old
59        });
60        Self { previous, current }
61    }
62
63    /// Drain the buffer captured during this guard's lifetime. Caller
64    /// flushes after COMMIT succeeds; on rollback the buffer is just
65    /// dropped along with the guard.
66    pub(crate) fn take(&self) -> Vec<PendingSchedule> {
67        std::mem::take(&mut *self.current.borrow_mut())
68    }
69}
70
71impl Drop for ScheduleBufferGuard {
72    fn drop(&mut self) {
73        MUTATION_SCHEDULE_BUFFER.with(|cell| {
74            *cell.borrow_mut() = self.previous.take();
75        });
76    }
77}
78
79// ---------------------------------------------------------------------------
80// In-flight mutation depth marker (deadlock guard)
81// ---------------------------------------------------------------------------
82
83thread_local! {
84    /// Counter of mutation-tx frames currently on the stack for this
85    /// thread. Both backends acquire a single connection mutex per
86    /// mutation (SQLite's write_conn, PG's `LivePostgresAdapter`).
87    /// `std::sync::Mutex` is NOT re-entrant — a TS handler that calls
88    /// `runMutation` from inside another mutation would block forever
89    /// trying to re-acquire the connection lock it already holds.
90    /// The nested-call hook checks this counter and rejects the call
91    /// with `NESTED_MUTATION` instead of hanging.
92    ///
93    /// Counter (not bool) so future savepoint-based nesting could
94    /// switch to a tx-reuse path without changing call sites.
95    static MUTATION_DEPTH: std::cell::Cell<u32> = const { std::cell::Cell::new(0) };
96}
97
98/// RAII marker — incremented on entry to a mutation handler, decremented
99/// on exit (including unwind). Used by the nested-call hook to detect
100/// recursive mutations and reject them with a clear error rather than
101/// deadlocking on the non-reentrant connection mutex.
102pub(crate) struct MutationDepthGuard;
103
104impl MutationDepthGuard {
105    pub(crate) fn enter() -> Self {
106        MUTATION_DEPTH.with(|d| d.set(d.get().saturating_add(1)));
107        Self
108    }
109}
110
111impl Drop for MutationDepthGuard {
112    fn drop(&mut self) {
113        MUTATION_DEPTH.with(|d| d.set(d.get().saturating_sub(1)));
114    }
115}
116
117/// True iff this thread is currently inside a mutation handler's tx.
118pub(crate) fn in_mutation_tx() -> bool {
119    MUTATION_DEPTH.with(|d| d.get() > 0)
120}
121
122// ---------------------------------------------------------------------------
123// PG /api/transact CRDT-aware impl
124// ---------------------------------------------------------------------------
125
126impl Runtime {
127    /// PG `/api/transact` implementation that runs each op through
128    /// the typed `tx_*` helpers (so FTS shadow rows + the new id-
129    /// reject + per-row locking all apply) AND adds CRDT projection
130    /// + sidecar maintenance for `crdt: true` entities. Without this,
131    /// batched admin writes desync from the CRDT layer.
132    pub(crate) fn pg_transact_with_crdt(
133        &self,
134        pg: &crate::PgBackend,
135        ops: &[serde_json::Value],
136    ) -> Result<(bool, Vec<serde_json::Value>), DataError> {
137        use pylon_storage::pg_tx_store::{tx_delete, tx_insert, tx_update};
138
139        // Pre-validate every op shape — a malformed payload should
140        // never open a tx and immediately roll back.
141        enum Op<'a> {
142            Insert {
143                entity: &'a str,
144                data: &'a serde_json::Value,
145            },
146            Update {
147                entity: &'a str,
148                id: &'a str,
149                data: &'a serde_json::Value,
150            },
151            Delete {
152                entity: &'a str,
153                id: &'a str,
154            },
155        }
156        let mut typed: Vec<Op<'_>> = Vec::with_capacity(ops.len());
157        for op in ops {
158            let op_type = op.get("op").and_then(|v| v.as_str()).unwrap_or("");
159            let entity = op
160                .get("entity")
161                .and_then(|v| v.as_str())
162                .ok_or_else(|| DataError {
163                    code: "TX_INVALID_OP".into(),
164                    message: "Each transact op must have an \"entity\" field".into(),
165                })?;
166            match op_type {
167                "insert" => {
168                    let data = op.get("data").ok_or_else(|| DataError {
169                        code: "TX_INVALID_OP".into(),
170                        message: "insert op requires \"data\"".into(),
171                    })?;
172                    typed.push(Op::Insert { entity, data });
173                }
174                "update" => {
175                    let id = op
176                        .get("id")
177                        .and_then(|v| v.as_str())
178                        .ok_or_else(|| DataError {
179                            code: "TX_INVALID_OP".into(),
180                            message: "update op requires \"id\"".into(),
181                        })?;
182                    let data = op.get("data").ok_or_else(|| DataError {
183                        code: "TX_INVALID_OP".into(),
184                        message: "update op requires \"data\"".into(),
185                    })?;
186                    typed.push(Op::Update { entity, id, data });
187                }
188                "delete" => {
189                    let id = op
190                        .get("id")
191                        .and_then(|v| v.as_str())
192                        .ok_or_else(|| DataError {
193                            code: "TX_INVALID_OP".into(),
194                            message: "delete op requires \"id\"".into(),
195                        })?;
196                    typed.push(Op::Delete { entity, id });
197                }
198                other => {
199                    return Err(DataError {
200                        code: "TX_INVALID_OP".into(),
201                        message: format!("unknown op \"{other}\""),
202                    });
203                }
204            }
205        }
206
207        // Track which CRDT rows we touched so we can refresh their
208        // cache entries after commit (or evict on rollback).
209        let mut crdt_touched: Vec<(String, String)> = Vec::new();
210
211        let manifest = self.manifest.clone();
212        let result = pg.store.with_transaction_raw(|tx| -> Result<Vec<serde_json::Value>, DataError> {
213            let mut json_results: Vec<serde_json::Value> = Vec::with_capacity(typed.len());
214            for op in &typed {
215                let result = match op {
216                    Op::Insert { entity, data } => {
217                        let ent = manifest.entities.iter().find(|e| e.name == *entity);
218                        let id = if ent.map(|e| e.crdt).unwrap_or(false) {
219                            let crdt_fields = self.crdt_fields_for(ent.unwrap()).map_err(|e| {
220                                DataError { code: e.code, message: e.message }
221                            })?;
222                            let id = crate::generate_id();
223                            pg.crdt
224                                .apply_patch(tx, entity, &id, &crdt_fields, data)
225                                .map_err(|e| DataError {
226                                    code: "CRDT_APPLY_FAILED".into(),
227                                    message: format!("crdt write {entity}/{id}: {e}"),
228                                })?;
229                            let mut row = (*data).clone();
230                            if let Some(obj) = row.as_object_mut() {
231                                obj.insert("id".into(), serde_json::Value::String(id.clone()));
232                            }
233                            tx_insert(tx, &manifest, entity, &row)?;
234                            crdt_touched.push((entity.to_string(), id.clone()));
235                            id
236                        } else {
237                            tx_insert(tx, &manifest, entity, data)?
238                        };
239                        serde_json::json!({ "op": "insert", "id": id })
240                    }
241                    Op::Update { entity, id, data } => {
242                        let ent = manifest.entities.iter().find(|e| e.name == *entity);
243                        let updated = if ent.map(|e| e.crdt).unwrap_or(false) {
244                            let crdt_fields = self.crdt_fields_for(ent.unwrap()).map_err(|e| {
245                                DataError { code: e.code, message: e.message }
246                            })?;
247                            pg.crdt
248                                .apply_patch(tx, entity, id, &crdt_fields, data)
249                                .map_err(|e| DataError {
250                                    code: "CRDT_APPLY_FAILED".into(),
251                                    message: format!("crdt update {entity}/{id}: {e}"),
252                                })?;
253                            let updated = tx_update(tx, &manifest, entity, id, data)?;
254                            if !updated {
255                                return Err(DataError {
256                                    code: "ENTITY_NOT_FOUND".into(),
257                                    message: format!(
258                                        "Update on {entity}/{id} found no row — refusing to commit \
259                                         a CRDT snapshot that would orphan."
260                                    ),
261                                });
262                            }
263                            crdt_touched.push((entity.to_string(), id.to_string()));
264                            updated
265                        } else {
266                            tx_update(tx, &manifest, entity, id, data)?
267                        };
268                        serde_json::json!({ "op": "update", "id": id, "updated": updated })
269                    }
270                    Op::Delete { entity, id } => {
271                        let ent = manifest.entities.iter().find(|e| e.name == *entity);
272                        let deleted = if ent.map(|e| e.crdt).unwrap_or(false) {
273                            tx.execute(
274                                "DELETE FROM _pylon_crdt_snapshots WHERE entity = $1 AND row_id = $2",
275                                &[entity, id],
276                            )
277                            .map_err(|e| DataError {
278                                code: "CRDT_SIDECAR_DELETE_FAILED".into(),
279                                message: format!(
280                                    "delete pg crdt snapshot {entity}/{id}: {e}"
281                                ),
282                            })?;
283                            let deleted = tx_delete(tx, &manifest, entity, id)?;
284                            crdt_touched.push((entity.to_string(), id.to_string()));
285                            deleted
286                        } else {
287                            tx_delete(tx, &manifest, entity, id)?
288                        };
289                        serde_json::json!({ "op": "delete", "id": id, "deleted": deleted })
290                    }
291                };
292                json_results.push(result);
293            }
294            // Refresh cache for CRDT rows we touched.
295            for (entity, id) in &crdt_touched {
296                pg.crdt.cache_after_commit(tx, entity, id);
297            }
298            Ok(json_results)
299        });
300
301        match result {
302            Ok(json_results) => Ok((true, json_results)),
303            Err(e) => {
304                for (entity, id) in &crdt_touched {
305                    pg.crdt.evict(entity, id);
306                }
307                Err(e)
308            }
309        }
310    }
311}
312
313// ---------------------------------------------------------------------------
314// DataStore → Runtime bridge
315// ---------------------------------------------------------------------------
316
317impl DataStore for Runtime {
318    fn manifest(&self) -> &pylon_kernel::AppManifest {
319        Runtime::manifest(self)
320    }
321
322    fn insert(&self, entity: &str, data: &serde_json::Value) -> Result<String, DataError> {
323        Runtime::insert(self, entity, data).map_err(into_data_error)
324    }
325
326    fn get_by_id(&self, entity: &str, id: &str) -> Result<Option<serde_json::Value>, DataError> {
327        Runtime::get_by_id(self, entity, id).map_err(into_data_error)
328    }
329
330    fn list(&self, entity: &str) -> Result<Vec<serde_json::Value>, DataError> {
331        Runtime::list(self, entity).map_err(into_data_error)
332    }
333
334    fn list_after(
335        &self,
336        entity: &str,
337        after: Option<&str>,
338        limit: usize,
339    ) -> Result<Vec<serde_json::Value>, DataError> {
340        Runtime::list_after(self, entity, after, limit).map_err(into_data_error)
341    }
342
343    fn update(&self, entity: &str, id: &str, data: &serde_json::Value) -> Result<bool, DataError> {
344        Runtime::update(self, entity, id, data).map_err(into_data_error)
345    }
346
347    fn delete(&self, entity: &str, id: &str) -> Result<bool, DataError> {
348        Runtime::delete(self, entity, id).map_err(into_data_error)
349    }
350
351    fn lookup(
352        &self,
353        entity: &str,
354        field: &str,
355        value: &str,
356    ) -> Result<Option<serde_json::Value>, DataError> {
357        Runtime::lookup(self, entity, field, value).map_err(into_data_error)
358    }
359
360    fn link(
361        &self,
362        entity: &str,
363        id: &str,
364        relation: &str,
365        target_id: &str,
366    ) -> Result<bool, DataError> {
367        Runtime::link(self, entity, id, relation, target_id).map_err(into_data_error)
368    }
369
370    fn unlink(&self, entity: &str, id: &str, relation: &str) -> Result<bool, DataError> {
371        Runtime::unlink(self, entity, id, relation).map_err(into_data_error)
372    }
373
374    fn query_filtered(
375        &self,
376        entity: &str,
377        filter: &serde_json::Value,
378    ) -> Result<Vec<serde_json::Value>, DataError> {
379        Runtime::query_filtered(self, entity, filter).map_err(into_data_error)
380    }
381
382    fn query_graph(&self, query: &serde_json::Value) -> Result<serde_json::Value, DataError> {
383        Runtime::query_graph(self, query).map_err(into_data_error)
384    }
385
386    fn aggregate(
387        &self,
388        entity: &str,
389        spec: &serde_json::Value,
390    ) -> Result<serde_json::Value, DataError> {
391        Runtime::aggregate(self, entity, spec).map_err(into_data_error)
392    }
393
394    fn transact(
395        &self,
396        ops: &[serde_json::Value],
397    ) -> Result<(bool, Vec<serde_json::Value>), DataError> {
398        // Postgres mode: delegate to the runtime-layer wrapper that
399        // adds CRDT projection + sidecar maintenance for crdt:true
400        // entities. The storage layer's transact (PostgresDataStore::
401        // transact) only knows about FTS — codex flagged that
402        // /api/transact would silently desync CRDT state.
403        if let Some(pg) = self.pg_backend() {
404            return self.pg_transact_with_crdt(pg, ops);
405        }
406        let conn = self.lock_conn_pub().map_err(into_data_error)?;
407        let _ = conn.execute("BEGIN", []);
408        let mut results: Vec<serde_json::Value> = Vec::new();
409        let mut rollback = false;
410
411        for op in ops {
412            let op_type = op.get("op").and_then(|v| v.as_str()).unwrap_or("");
413            let entity = op.get("entity").and_then(|v| v.as_str()).unwrap_or("");
414
415            match op_type {
416                "insert" => {
417                    let data = op.get("data").cloned().unwrap_or(serde_json::json!({}));
418                    match self.insert_with_conn(&conn, entity, &data) {
419                        Ok(id) => {
420                            results.push(serde_json::json!({"op": "insert", "id": id}));
421                        }
422                        Err(e) => {
423                            results.push(serde_json::json!({"op": "insert", "error": e.message}));
424                            rollback = true;
425                            break;
426                        }
427                    }
428                }
429                "update" => {
430                    let id = op.get("id").and_then(|v| v.as_str()).unwrap_or("");
431                    let data = op.get("data").cloned().unwrap_or(serde_json::json!({}));
432                    match self.update_with_conn(&conn, entity, id, &data) {
433                        Ok(_) => {
434                            results.push(serde_json::json!({"op": "update", "id": id}));
435                        }
436                        Err(e) => {
437                            results.push(serde_json::json!({"op": "update", "error": e.message}));
438                            rollback = true;
439                            break;
440                        }
441                    }
442                }
443                "delete" => {
444                    let id = op.get("id").and_then(|v| v.as_str()).unwrap_or("");
445                    match self.delete_with_conn(&conn, entity, id) {
446                        Ok(_) => {
447                            results.push(serde_json::json!({"op": "delete", "id": id}));
448                        }
449                        Err(e) => {
450                            results.push(serde_json::json!({"op": "delete", "error": e.message}));
451                            rollback = true;
452                            break;
453                        }
454                    }
455                }
456                _ => {
457                    results.push(serde_json::json!({"op": op_type, "error": "unknown operation"}));
458                }
459            }
460        }
461
462        if rollback {
463            let _ = conn.execute("ROLLBACK", []);
464        } else {
465            let _ = conn.execute("COMMIT", []);
466        }
467
468        Ok((!rollback, results))
469    }
470
471    /// Bridge the typed `SearchQuery` / `SearchResult` shapes to the
472    /// trait's JSON-in / JSON-out contract. The router passes a JSON
473    /// body; we deserialize, look up the entity's `SearchConfig`, run
474    /// the planner, and re-serialize. Serialization round-tripping
475    /// lets this method live on the DataStore trait without forcing
476    /// pylon-http to depend on pylon-storage.
477    fn search(
478        &self,
479        entity: &str,
480        query: &serde_json::Value,
481    ) -> Result<serde_json::Value, DataError> {
482        let ent = self
483            .manifest()
484            .entities
485            .iter()
486            .find(|e| e.name == entity)
487            .ok_or_else(|| DataError {
488                code: "ENTITY_NOT_FOUND".into(),
489                message: format!("Unknown entity: {entity}"),
490            })?;
491        let cfg = ent.search.as_ref().ok_or_else(|| DataError {
492            code: "SEARCH_NOT_CONFIGURED".into(),
493            message: format!("Entity {entity} has no `search:` config"),
494        })?;
495        let parsed: pylon_storage::search::SearchQuery = serde_json::from_value(query.clone())
496            .map_err(|e| DataError {
497                code: "INVALID_QUERY".into(),
498                message: format!("search query body: {e}"),
499            })?;
500
501        // Postgres: dispatch to the PG-native FTS path (`tsvector` +
502        // GIN index, maintained transactionally alongside CRUD by
503        // PgTxStore). Same `SearchResult` shape as the SQLite path.
504        if self.is_postgres() {
505            let pg = self.pg_data_store().ok_or_else(|| DataError {
506                code: "PG_DATASTORE_MISSING".into(),
507                message: "is_postgres=true but pg_data_store() returned None".into(),
508            })?;
509            let result = pg.run_search(entity, cfg, &parsed).map_err(|e| DataError {
510                code: e.code,
511                message: e.message,
512            })?;
513            return serde_json::to_value(&result).map_err(|e| DataError {
514                code: "SEARCH_SERIALIZE_FAILED".into(),
515                message: e.to_string(),
516            });
517        }
518
519        let conn = self.lock_conn_pub().map_err(into_data_error)?;
520        let result =
521            pylon_storage::search_query::run_search(&conn, entity, cfg, &parsed).map_err(|e| {
522                DataError {
523                    code: e.code,
524                    message: e.message,
525                }
526            })?;
527        serde_json::to_value(&result).map_err(|e| DataError {
528            code: "SEARCH_SERIALIZE_FAILED".into(),
529            message: e.to_string(),
530        })
531    }
532
533    /// Return the binary CRDT snapshot for a row. `Ok(None)` for any
534    /// entity with `crdt: false` (the LWW opt-out) — the router uses
535    /// that to decide whether to ship a binary update over WebSocket
536    /// after the write.
537    fn crdt_snapshot(&self, entity: &str, row_id: &str) -> Result<Option<Vec<u8>>, DataError> {
538        // Postgres: read from the PG `_pylon_crdt_snapshots` sidecar
539        // via the same `PgLoroStore` that maintenance writes go
540        // through. Same Ok(None) early-exit for non-CRDT entities so
541        // the router skips the binary broadcast.
542        if self.is_postgres() {
543            let ent = self
544                .manifest()
545                .entities
546                .iter()
547                .find(|e| e.name == entity)
548                .ok_or_else(|| DataError {
549                    code: "ENTITY_NOT_FOUND".into(),
550                    message: format!("Unknown entity: {entity}"),
551                })?;
552            if !ent.crdt {
553                return Ok(None);
554            }
555            let pg_backend = match self.pg_backend() {
556                Some(pg) => pg,
557                None => return Ok(None),
558            };
559            // Single-read; with_client is fine here. PgLoroStore's
560            // hydrate-on-miss + per-row Mutex ensures consistent
561            // bytes even under concurrent applies on other threads.
562            let snap = pg_backend
563                .store
564                .with_client(|client| -> Result<Vec<u8>, DataError> {
565                    pg_backend
566                        .crdt
567                        .snapshot(client, entity, row_id)
568                        .map_err(|e| DataError {
569                            code: "CRDT_SNAPSHOT_FAILED".into(),
570                            message: format!("snapshot {entity}/{row_id}: {e}"),
571                        })
572                })?;
573            return Ok(Some(snap));
574        }
575        let ent = self
576            .manifest()
577            .entities
578            .iter()
579            .find(|e| e.name == entity)
580            .ok_or_else(|| DataError {
581                code: "ENTITY_NOT_FOUND".into(),
582                message: format!("Unknown entity: {entity}"),
583            })?;
584        if !ent.crdt {
585            return Ok(None);
586        }
587        let conn = self.lock_conn_pub().map_err(into_data_error)?;
588        let snap = self
589            .crdt_store()
590            .snapshot(&conn, entity, row_id)
591            .map_err(|e| DataError {
592                code: "CRDT_SNAPSHOT_FAILED".into(),
593                message: format!("snapshot {entity}/{row_id}: {e}"),
594            })?;
595        Ok(Some(snap))
596    }
597
598    /// Client-pushed Loro update. Imports into the row's LoroDoc,
599    /// re-projects the doc state into the materialized SQLite columns
600    /// (so subsequent reads see the merged content), and returns the
601    /// fresh full-row snapshot for the router to broadcast to other
602    /// clients.
603    ///
604    /// Wrapped in a single SQLite transaction — same crash-safety
605    /// shape as `Runtime::insert/update`. Either the LoroStore +
606    /// SQLite columns both update or neither does.
607    fn crdt_apply_update(
608        &self,
609        entity: &str,
610        row_id: &str,
611        update: &[u8],
612    ) -> Result<Vec<u8>, DataError> {
613        // Postgres: import the binary update into the row's PG-side
614        // LoroDoc, persist the new snapshot in `_pylon_crdt_snapshots`,
615        // re-project to the materialized PG row's columns, and return
616        // the fresh full snapshot for the router to broadcast. Same
617        // shape as the SQLite path below.
618        if self.is_postgres() {
619            let ent = self
620                .manifest()
621                .entities
622                .iter()
623                .find(|e| e.name == entity)
624                .ok_or_else(|| DataError {
625                    code: "ENTITY_NOT_FOUND".into(),
626                    message: format!("Unknown entity: {entity}"),
627                })?
628                .clone();
629            if !ent.crdt {
630                return Err(DataError {
631                    code: "NOT_SUPPORTED".into(),
632                    message: format!(
633                        "CRDT update sent for entity \"{entity}\" which has crdt: false"
634                    ),
635                });
636            }
637            let pg_backend = self.pg_backend().ok_or_else(|| DataError {
638                code: "PG_BACKEND_MISSING".into(),
639                message: "is_postgres=true but pg_backend() returned None".into(),
640            })?;
641            let crdt_fields = self.crdt_fields_for(&ent).map_err(into_data_error)?;
642
643            // One transaction: apply the peer's update into the
644            // LoroDoc + persist the new snapshot + reproject into the
645            // materialized PG row + read back the fresh snapshot for
646            // broadcast. Pre-fix this was three separate autocommits
647            // — a failure between them desynced the layers, and the
648            // broadcast snapshot might not reflect what actually
649            // landed on disk.
650            let result =
651                pg_backend
652                    .store
653                    .with_transaction_raw(|tx| -> Result<Vec<u8>, DataError> {
654                        let projected = pg_backend
655                            .crdt
656                            .apply_remote_update(tx, entity, row_id, &crdt_fields, update)
657                            .map_err(|e| {
658                                // Distinguish decode errors (malformed client
659                                // bytes — caller's fault, 400) from apply
660                                // errors (schema mismatch, also caller's
661                                // fault but a different shape). The CRDT
662                                // route maps CRDT_DECODE_FAILED → 400, so
663                                // unmapped errors land as 500 — codex
664                                // flagged the asymmetry vs the SQLite path.
665                                let code = match &e {
666                                    crate::loro_store::LoroStoreError::Decode(_) => {
667                                        "CRDT_DECODE_FAILED"
668                                    }
669                                    _ => "CRDT_APPLY_FAILED",
670                                };
671                                DataError {
672                                    code: code.into(),
673                                    message: format!("crdt apply update {entity}/{row_id}: {e}"),
674                                }
675                            })?;
676                        let updated = pylon_storage::pg_tx_store::tx_update(
677                            tx,
678                            self.manifest(),
679                            entity,
680                            row_id,
681                            &projected,
682                        )?;
683                        if !updated {
684                            // Same orphan guard as Runtime::update — refuse
685                            // to commit a snapshot for a row that doesn't
686                            // exist. Peer pushed an update for a row this
687                            // replica's never seen.
688                            return Err(DataError {
689                                code: "ENTITY_NOT_FOUND".into(),
690                                message: format!(
691                                    "Peer-pushed CRDT update targets {entity}/{row_id} which has \
692                             no materialized row — refusing to commit an orphan snapshot."
693                                ),
694                            });
695                        }
696                        // Read the snapshot back from the tx, bypassing the
697                        // cache — a prior `crdt_snapshot()` call could have
698                        // populated the cache with bytes that predate this
699                        // peer update, and broadcasting them would silently
700                        // omit the just-applied change. Codex flagged this.
701                        let snap = crate::pg_loro_store::PgLoroStore::read_snapshot_via_conn(
702                            tx, entity, row_id,
703                        )
704                        .map_err(|e| DataError {
705                            code: "CRDT_SNAPSHOT_FAILED".into(),
706                            message: format!("post-update snapshot {entity}/{row_id}: {e}"),
707                        })?;
708                        // Refresh the cache so the next reader on this
709                        // process skips the round-trip.
710                        pg_backend.crdt.cache_after_commit(tx, entity, row_id);
711                        Ok(snap)
712                    });
713            if result.is_err() {
714                // Same cache-coherency hygiene as Runtime::insert /
715                // update — the in-memory doc absorbed the peer's
716                // update before the tx rolled back, so evict to
717                // force re-hydration from the persisted snapshot.
718                pg_backend.crdt.evict(entity, row_id);
719            }
720            return result;
721        }
722        // Find the entity so we can build the projection field list +
723        // confirm CRDT mode is on. Cheap manifest scan; counts are tiny.
724        let ent = self
725            .manifest()
726            .entities
727            .iter()
728            .find(|e| e.name == entity)
729            .ok_or_else(|| DataError {
730                code: "ENTITY_NOT_FOUND".into(),
731                message: format!("Unknown entity: {entity}"),
732            })?
733            .clone();
734        if !ent.crdt {
735            return Err(DataError {
736                code: "NOT_SUPPORTED".into(),
737                message: format!("Entity {entity} has crdt: false; client push requires CRDT mode"),
738            });
739        }
740        let crdt_fields = self.crdt_fields_for(&ent).map_err(into_data_error)?;
741
742        let conn = self.lock_conn_pub().map_err(into_data_error)?;
743        crate::with_write_tx(&conn, || -> Result<Vec<u8>, crate::RuntimeError> {
744            // Apply the update to the LoroDoc + persist the new snapshot
745            // to the sidecar. Returns the projected JSON shape for the
746            // post-merge state.
747            let projected = self
748                .crdt_store()
749                .apply_remote_update(&conn, entity, row_id, &crdt_fields, update)
750                .map_err(|e| crate::RuntimeError {
751                    code: "CRDT_APPLY_FAILED".into(),
752                    message: format!("apply_remote_update {entity}/{row_id}: {e}"),
753                })?;
754
755            // Re-project into the materialized SQLite row so SELECT
756            // queries see the merged content. Build SET clauses from
757            // the projection — every CRDT-managed field gets rewritten.
758            let projection = projected.as_object().ok_or_else(|| crate::RuntimeError {
759                code: "CRDT_PROJECTION_INVALID".into(),
760                message: "projected row was not a JSON object".into(),
761            })?;
762
763            let mut set_clauses = Vec::with_capacity(projection.len());
764            let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
765            let mut idx = 1;
766            for (key, val) in projection {
767                if key == "id" {
768                    continue;
769                }
770                set_clauses.push(format!("{} = ?{idx}", crate::quote_ident(key.as_str())));
771                values.push(crate::json_to_sql(val));
772                idx += 1;
773            }
774            if set_clauses.is_empty() {
775                // No projected fields — happens when the doc has no
776                // top-level keys yet (fresh row from a peer subscribing
777                // before any writes). Skip the UPDATE; row may not exist
778                // in SQLite. Subsequent inserts will materialize it.
779            } else {
780                values.push(Box::new(row_id.to_string()));
781                let sql = format!(
782                    "UPDATE {} SET {} WHERE \"id\" = ?{idx}",
783                    crate::quote_ident(entity),
784                    set_clauses.join(", ")
785                );
786                let params: Vec<&dyn rusqlite::types::ToSql> =
787                    values.iter().map(|v| v.as_ref()).collect();
788                conn.execute(&sql, params.as_slice())
789                    .map_err(|e| crate::RuntimeError {
790                        code: "UPDATE_FAILED".into(),
791                        message: format!("post-merge UPDATE {entity}/{row_id}: {e}"),
792                    })?;
793            }
794
795            // Return the new snapshot for the router to broadcast.
796            let snap = self
797                .crdt_store()
798                .snapshot(&conn, entity, row_id)
799                .map_err(|e| crate::RuntimeError {
800                    code: "CRDT_SNAPSHOT_FAILED".into(),
801                    message: format!("post-merge snapshot {entity}/{row_id}: {e}"),
802                })?;
803            Ok(snap)
804        })
805        .map_err(into_data_error)
806    }
807}
808
809fn into_data_error(e: crate::RuntimeError) -> DataError {
810    DataError {
811        code: e.code,
812        message: e.message,
813    }
814}
815
816// ---------------------------------------------------------------------------
817// ChangeNotifier for WsHub + SseHub
818// ---------------------------------------------------------------------------
819
820use crate::sse::SseHub;
821use crate::ws::WsHub;
822use std::sync::Arc;
823
824/// Bridges WebSocket + SSE hubs to the router's [`ChangeNotifier`] trait.
825pub struct WsSseNotifier {
826    pub ws: Arc<WsHub>,
827    pub sse: Arc<SseHub>,
828}
829
830impl pylon_router::ChangeNotifier for WsSseNotifier {
831    fn notify(&self, event: &pylon_sync::ChangeEvent) {
832        self.ws.broadcast(event);
833        self.sse.broadcast(event);
834    }
835
836    fn notify_presence(&self, json: &str) {
837        self.ws.broadcast_presence(json);
838        self.sse.broadcast_message(json);
839    }
840
841    /// Encode a CRDT broadcast frame (1-byte type + length-prefixed
842    /// entity + length-prefixed row_id + Loro snapshot bytes) and ship
843    /// it to clients SUBSCRIBED to this row. SSE is text-only so it
844    /// gets skipped — clients on the SSE transport stay on the JSON
845    /// change-event path until a future SSE-friendly encoding (base64
846    /// or hex-encoded chunks) lands.
847    ///
848    /// Filtering by subscription instead of broadcasting to every WS
849    /// client matters once more than a handful of rows are in flight:
850    /// a 50-channel app with 100 connected users would otherwise fan
851    /// 100x for every keystroke in a single channel. Now each binary
852    /// frame goes only to the (typically small) set of tabs that asked
853    /// to mirror that specific row.
854    ///
855    /// If no clients are subscribed (empty list) the frame is dropped
856    /// silently — the JSON change event from `notify` already told
857    /// every connected client a write happened, so non-subscribed
858    /// clients can re-fetch via the regular query path if they care.
859    ///
860    /// Authz: the policy check happens at SUBSCRIBE TIME (in
861    /// `start_ws_server`'s SnapshotFetcher closure) — clients on the
862    /// subscriber list have already passed `check_entity_read` for
863    /// the row at that moment. We don't re-check on every broadcast
864    /// because the broadcast hot path runs from the write thread
865    /// without per-client auth context. A consequence: if a client is
866    /// already subscribed and their permissions change mid-session
867    /// (e.g. they're removed from a private channel), they'll keep
868    /// receiving CRDT frames for that row until they disconnect.
869    /// Future work: index subscribers by auth context so the broadcast
870    /// can re-check, or invalidate subscriptions on policy changes.
871    ///
872    /// Frame-encode failure (entity / row_id over the 16-bit length
873    /// header) gets logged and dropped — the row's regular JSON change
874    /// event already shipped via `notify`, so clients still see the
875    /// write happened, they just don't get the binary CRDT delta.
876    fn notify_crdt(&self, entity: &str, row_id: &str, snapshot: &[u8]) {
877        let subscribers = self.ws.subscriptions().subscribers(entity, row_id);
878        if subscribers.is_empty() {
879            return;
880        }
881        match pylon_router::encode_crdt_frame(
882            pylon_router::CRDT_FRAME_SNAPSHOT,
883            entity,
884            row_id,
885            snapshot,
886        ) {
887            Ok(frame) => self.ws.broadcast_binary_to(&subscribers, frame),
888            Err(e) => {
889                tracing::warn!("[crdt] dropping binary frame for {entity}/{row_id}: {e}");
890            }
891        }
892    }
893}
894
895/// Serialize a value to JSON, falling back to `{}` on failure.
896fn to_json<T: serde::Serialize>(val: T) -> serde_json::Value {
897    serde_json::to_value(val).unwrap_or(serde_json::json!({}))
898}
899
900/// Serialize a value to JSON, falling back to `[]` on failure.
901fn to_json_array<T: serde::Serialize>(val: T) -> serde_json::Value {
902    serde_json::to_value(val).unwrap_or(serde_json::json!([]))
903}
904
905// ---------------------------------------------------------------------------
906// Adapter: RoomManager → RoomOps
907// ---------------------------------------------------------------------------
908
909use crate::rooms::RoomManager;
910
911impl pylon_router::RoomOps for RoomManager {
912    fn join(
913        &self,
914        room: &str,
915        user_id: &str,
916        data: Option<serde_json::Value>,
917    ) -> Result<(serde_json::Value, serde_json::Value), DataError> {
918        RoomManager::join(self, room, user_id, data)
919            .map(|(snapshot, join_event)| (to_json(&snapshot), to_json(&join_event)))
920            .map_err(|e| DataError {
921                code: e.code,
922                message: e.message,
923            })
924    }
925
926    fn leave(&self, room: &str, user_id: &str) -> Option<serde_json::Value> {
927        RoomManager::leave(self, room, user_id).map(|event| to_json(&event))
928    }
929
930    fn set_presence(
931        &self,
932        room: &str,
933        user_id: &str,
934        data: serde_json::Value,
935    ) -> Option<serde_json::Value> {
936        RoomManager::set_presence(self, room, user_id, data).map(|event| to_json(&event))
937    }
938
939    fn broadcast(
940        &self,
941        room: &str,
942        sender: Option<&str>,
943        topic: &str,
944        data: serde_json::Value,
945    ) -> Option<serde_json::Value> {
946        RoomManager::broadcast(self, room, sender, topic, data).map(|event| to_json(&event))
947    }
948
949    fn list_rooms(&self) -> Vec<String> {
950        RoomManager::list_rooms(self)
951    }
952
953    fn room_size(&self, name: &str) -> usize {
954        RoomManager::room_size(self, name)
955    }
956
957    fn members(&self, name: &str) -> Vec<serde_json::Value> {
958        RoomManager::members(self, name)
959            .into_iter()
960            .map(|p| to_json(p))
961            .collect()
962    }
963}
964
965// ---------------------------------------------------------------------------
966// Adapter: CachePlugin → CacheOps (newtype wrapper for orphan rule)
967// ---------------------------------------------------------------------------
968
969use pylon_plugin::builtin::cache::CachePlugin;
970
971/// Adapter that routes router-level CRUD hook calls into the PluginRegistry.
972///
973/// The router holds a `&dyn PluginHookOps`; this adapter wraps the runtime's
974/// `Arc<PluginRegistry>` so registered plugins (audit_log, validation,
975/// webhooks, timestamps, slugify, versioning, search) run on every
976/// POST/PATCH/DELETE under `/api/entities/*`. Without this wiring, plugins
977/// only saw the `on_request` hook and never got a chance to observe or
978/// reject data-plane writes — a quiet correctness hole noted in the
979/// pentest review.
980pub struct PluginHooksAdapter(pub Arc<pylon_plugin::PluginRegistry>);
981
982impl pylon_router::PluginHookOps for PluginHooksAdapter {
983    fn before_insert(
984        &self,
985        entity: &str,
986        data: &mut serde_json::Value,
987        auth: &pylon_auth::AuthContext,
988    ) -> Result<(), (u16, String, String)> {
989        self.0
990            .run_before_insert(entity, data, auth)
991            .map_err(|e| (e.status, e.code, e.message))
992    }
993    fn after_insert(
994        &self,
995        entity: &str,
996        id: &str,
997        data: &serde_json::Value,
998        auth: &pylon_auth::AuthContext,
999    ) {
1000        self.0.run_after_insert(entity, id, data, auth);
1001    }
1002    fn before_update(
1003        &self,
1004        entity: &str,
1005        id: &str,
1006        data: &mut serde_json::Value,
1007        auth: &pylon_auth::AuthContext,
1008    ) -> Result<(), (u16, String, String)> {
1009        self.0
1010            .run_before_update(entity, id, data, auth)
1011            .map_err(|e| (e.status, e.code, e.message))
1012    }
1013    fn after_update(
1014        &self,
1015        entity: &str,
1016        id: &str,
1017        data: &serde_json::Value,
1018        auth: &pylon_auth::AuthContext,
1019    ) {
1020        self.0.run_after_update(entity, id, data, auth);
1021    }
1022    fn before_delete(
1023        &self,
1024        entity: &str,
1025        id: &str,
1026        auth: &pylon_auth::AuthContext,
1027    ) -> Result<(), (u16, String, String)> {
1028        self.0
1029            .run_before_delete(entity, id, auth)
1030            .map_err(|e| (e.status, e.code, e.message))
1031    }
1032    fn after_delete(&self, entity: &str, id: &str, auth: &pylon_auth::AuthContext) {
1033        self.0.run_after_delete(entity, id, auth);
1034    }
1035}
1036
1037pub struct CacheAdapter(pub Arc<CachePlugin>);
1038
1039impl pylon_router::CacheOps for CacheAdapter {
1040    fn handle_command(&self, body: &str) -> (u16, String) {
1041        crate::cache_handlers::handle_cache_command(&self.0, body)
1042    }
1043
1044    fn handle_get(&self, key: &str) -> (u16, String) {
1045        crate::cache_handlers::handle_cache_get(&self.0, key)
1046    }
1047
1048    fn handle_delete(&self, key: &str) -> (u16, String) {
1049        crate::cache_handlers::handle_cache_delete(&self.0, key)
1050    }
1051}
1052
1053// ---------------------------------------------------------------------------
1054// Adapter: PubSubBroker → PubSubOps (newtype wrapper for orphan rule)
1055// ---------------------------------------------------------------------------
1056
1057use crate::pubsub::PubSubBroker;
1058
1059pub struct PubSubAdapter(pub Arc<PubSubBroker>);
1060
1061impl pylon_router::PubSubOps for PubSubAdapter {
1062    fn handle_publish(&self, body: &str) -> (u16, String) {
1063        crate::cache_handlers::handle_pubsub_publish(&self.0, body)
1064    }
1065
1066    fn handle_channels(&self) -> (u16, String) {
1067        crate::cache_handlers::handle_pubsub_channels(&self.0)
1068    }
1069
1070    fn handle_history(&self, channel: &str, url: &str) -> (u16, String) {
1071        crate::cache_handlers::handle_pubsub_history(&self.0, channel, url)
1072    }
1073}
1074
1075// ---------------------------------------------------------------------------
1076// Adapter: JobQueue → JobOps
1077// ---------------------------------------------------------------------------
1078
1079use crate::jobs::{JobQueue, Priority};
1080
1081impl pylon_router::JobOps for JobQueue {
1082    fn enqueue(
1083        &self,
1084        name: &str,
1085        payload: serde_json::Value,
1086        priority: &str,
1087        delay_secs: u64,
1088        max_retries: u32,
1089        queue: &str,
1090    ) -> String {
1091        let pri = Priority::from_str_loose(priority);
1092        JobQueue::enqueue_with_options(self, name, payload, pri, delay_secs, max_retries, queue)
1093    }
1094
1095    fn stats(&self) -> serde_json::Value {
1096        to_json(JobQueue::stats(self))
1097    }
1098
1099    fn dead_letters(&self) -> serde_json::Value {
1100        to_json_array(JobQueue::dead_letters(self))
1101    }
1102
1103    fn retry_dead(&self, id: &str) -> bool {
1104        JobQueue::retry_dead(self, id)
1105    }
1106
1107    fn list_jobs(
1108        &self,
1109        status: Option<&str>,
1110        queue: Option<&str>,
1111        limit: usize,
1112    ) -> serde_json::Value {
1113        to_json_array(JobQueue::list_jobs(self, status, queue, limit))
1114    }
1115
1116    fn get_job(&self, id: &str) -> Option<serde_json::Value> {
1117        JobQueue::get_job(self, id).map(|j| to_json(j))
1118    }
1119}
1120
1121// ---------------------------------------------------------------------------
1122// Adapter: Scheduler → SchedulerOps
1123// ---------------------------------------------------------------------------
1124
1125use crate::scheduler::Scheduler;
1126
1127impl pylon_router::SchedulerOps for Scheduler {
1128    fn list_tasks(&self) -> serde_json::Value {
1129        to_json_array(Scheduler::list_tasks(self))
1130    }
1131
1132    fn trigger(&self, name: &str) -> bool {
1133        Scheduler::trigger(self, name)
1134    }
1135}
1136
1137// ---------------------------------------------------------------------------
1138// Adapter: WorkflowEngine → WorkflowOps
1139// ---------------------------------------------------------------------------
1140
1141use crate::workflows::WorkflowEngine;
1142
1143impl pylon_router::WorkflowOps for WorkflowEngine {
1144    fn definitions(&self) -> serde_json::Value {
1145        to_json_array(WorkflowEngine::definitions(self))
1146    }
1147
1148    fn start(&self, name: &str, input: serde_json::Value) -> Result<String, String> {
1149        WorkflowEngine::start(self, name, input)
1150    }
1151
1152    fn list(&self, status_filter: Option<&str>) -> serde_json::Value {
1153        // Convert string filter to WorkflowStatus for the engine.
1154        let filter = status_filter.and_then(|s| match s {
1155            "pending" => Some(crate::workflows::WorkflowStatus::Pending),
1156            "running" => Some(crate::workflows::WorkflowStatus::Running),
1157            "sleeping" => Some(crate::workflows::WorkflowStatus::Sleeping),
1158            "waiting" => Some(crate::workflows::WorkflowStatus::WaitingForEvent),
1159            "completed" => Some(crate::workflows::WorkflowStatus::Completed),
1160            "failed" => Some(crate::workflows::WorkflowStatus::Failed),
1161            "cancelled" => Some(crate::workflows::WorkflowStatus::Cancelled),
1162            _ => None,
1163        });
1164        to_json_array(WorkflowEngine::list(self, filter.as_ref()))
1165    }
1166
1167    fn get(&self, id: &str) -> Option<serde_json::Value> {
1168        WorkflowEngine::get(self, id).map(|inst| to_json(inst))
1169    }
1170
1171    fn advance(&self, id: &str) -> Result<String, String> {
1172        WorkflowEngine::advance(self, id).map(|status| format!("{:?}", status))
1173    }
1174
1175    fn send_event(&self, id: &str, event: &str, data: serde_json::Value) -> Result<(), String> {
1176        WorkflowEngine::send_event(self, id, event, data)
1177    }
1178
1179    fn cancel(&self, id: &str) -> Result<(), String> {
1180        WorkflowEngine::cancel(self, id)
1181    }
1182}
1183
1184// ---------------------------------------------------------------------------
1185// Adapter: FileStorage trait → FileOps
1186// ---------------------------------------------------------------------------
1187
1188use pylon_storage::files::{FileStorage, LocalFileStorage, Stack0FileStorage};
1189
1190/// Adapter that exposes a [`FileStorage`] backend through the router's [`FileOps`].
1191pub struct FileOpsAdapter {
1192    pub storage: Arc<dyn FileStorage>,
1193}
1194
1195impl FileOpsAdapter {
1196    /// Create from environment variables.
1197    ///
1198    /// Selects backend via `PYLON_FILES_PROVIDER`:
1199    /// - `local` (default) — files saved under `PYLON_FILES_DIR` and served
1200    ///   via `PYLON_FILES_URL_PREFIX`.
1201    /// - `stack0` — uploads go to Stack0's CDN. Requires `PYLON_STACK0_API_KEY`.
1202    pub fn from_env() -> Self {
1203        let provider = std::env::var("PYLON_FILES_PROVIDER").unwrap_or_else(|_| "local".into());
1204        match provider.as_str() {
1205            "stack0" => match Stack0FileStorage::from_env() {
1206                Some(s) => Self {
1207                    storage: Arc::new(s),
1208                },
1209                None => {
1210                    tracing::warn!(
1211                        "PYLON_FILES_PROVIDER=stack0 but PYLON_STACK0_API_KEY is not set; falling back to local storage"
1212                    );
1213                    Self::local_from_env()
1214                }
1215            },
1216            _ => Self::local_from_env(),
1217        }
1218    }
1219
1220    fn local_from_env() -> Self {
1221        let dir = std::env::var("PYLON_FILES_DIR").unwrap_or_else(|_| "uploads".into());
1222        let url_prefix =
1223            std::env::var("PYLON_FILES_URL_PREFIX").unwrap_or_else(|_| "/api/files".into());
1224        Self {
1225            storage: Arc::new(LocalFileStorage::new(&dir, &url_prefix)),
1226        }
1227    }
1228}
1229
1230impl pylon_router::FileOps for FileOpsAdapter {
1231    fn upload(&self, _body: &str) -> (u16, String) {
1232        // The self-hosted server short-circuits /api/files/upload BEFORE the
1233        // request body is lossily coerced to a String, so binary uploads are
1234        // handled there. This fallback exists for non-self-hosted adapters
1235        // (e.g., Workers) and for defense in depth; it rejects string bodies
1236        // that wouldn't carry binary data correctly.
1237        (
1238            400,
1239            pylon_router::json_error(
1240                "UPLOAD_NEEDS_BINARY",
1241                "File uploads must use multipart/form-data or raw binary with X-Filename; this platform does not support string-body uploads",
1242            ),
1243        )
1244    }
1245
1246    fn get_file(&self, id: &str) -> (u16, String) {
1247        match self.storage.get(id) {
1248            Ok(content) => (200, String::from_utf8_lossy(&content).into_owned()),
1249            Err(e) if e.code == "NOT_FOUND" => {
1250                (404, pylon_router::json_error("FILE_NOT_FOUND", &e.message))
1251            }
1252            Err(e) => (400, pylon_router::json_error(&e.code, &e.message)),
1253        }
1254    }
1255}
1256
1257/// Backwards-compatible alias; old code refers to this name.
1258pub type LocalFileOps = FileOpsAdapter;
1259
1260impl LocalFileOps {
1261    /// Default instance backed by the local `uploads/` directory.
1262    pub fn new_default() -> Self {
1263        Self::from_env()
1264    }
1265}
1266
1267// ---------------------------------------------------------------------------
1268// Adapter: EmailTransport → EmailSender
1269// ---------------------------------------------------------------------------
1270
1271use pylon_auth::email::{ConsoleTransport, EmailTransport, HttpEmailTransport};
1272
1273/// Picks an email backend based on environment variables.
1274/// Falls back to `ConsoleTransport` (prints to stderr) when no provider is configured.
1275pub struct EmailAdapter {
1276    transport: Box<dyn EmailTransport>,
1277}
1278
1279impl EmailAdapter {
1280    pub fn from_env() -> Self {
1281        if let Some(http) = HttpEmailTransport::from_env() {
1282            Self {
1283                transport: Box::new(http),
1284            }
1285        } else {
1286            Self {
1287                transport: Box::new(ConsoleTransport),
1288            }
1289        }
1290    }
1291}
1292
1293impl pylon_router::EmailSender for EmailAdapter {
1294    fn send(&self, to: &str, subject: &str, body: &str) -> Result<(), String> {
1295        self.transport
1296            .send(to, subject, body)
1297            .map_err(|e| e.message)
1298    }
1299}
1300
1301// ---------------------------------------------------------------------------
1302// Adapter: OpenAPI generator
1303// ---------------------------------------------------------------------------
1304
1305pub struct RuntimeOpenApiGenerator<'a> {
1306    pub manifest: &'a pylon_kernel::AppManifest,
1307}
1308
1309impl<'a> pylon_router::OpenApiGenerator for RuntimeOpenApiGenerator<'a> {
1310    fn generate(&self, base_url: &str) -> String {
1311        let spec = crate::openapi::generate_openapi(self.manifest, base_url);
1312        serde_json::to_string(&spec).unwrap_or_else(|_| "{}".into())
1313    }
1314}
1315
1316// ---------------------------------------------------------------------------
1317// Adapter: DynShardRegistry → ShardOps
1318// ---------------------------------------------------------------------------
1319
1320/// Wraps any `Arc<dyn DynShardRegistry>` so the router can dispatch shard
1321/// routes without knowing the concrete SimState type.
1322pub struct ShardOpsAdapter {
1323    pub registry: Arc<dyn pylon_realtime::DynShardRegistry>,
1324}
1325
1326impl pylon_router::ShardOps for ShardOpsAdapter {
1327    fn get_shard(&self, id: &str) -> Option<Arc<dyn pylon_realtime::DynShard>> {
1328        self.registry.get(id)
1329    }
1330
1331    fn list_shards(&self) -> Vec<String> {
1332        self.registry.ids()
1333    }
1334
1335    fn shard_count(&self) -> usize {
1336        self.registry.len()
1337    }
1338}
1339
1340#[cfg(test)]
1341mod find_runtime_tests {
1342    use super::*;
1343
1344    #[test]
1345    fn env_override_takes_precedence() {
1346        let dir = std::env::temp_dir().join(format!("pylon_rt_{}", std::process::id()));
1347        let _ = std::fs::create_dir_all(&dir);
1348        let path = dir.join("custom_runtime.ts");
1349        std::fs::write(&path, "// test").unwrap();
1350
1351        std::env::set_var("PYLON_FUNCTIONS_RUNTIME", path.to_str().unwrap());
1352        let found = find_functions_runtime();
1353        std::env::remove_var("PYLON_FUNCTIONS_RUNTIME");
1354
1355        assert_eq!(found.as_deref(), path.to_str());
1356
1357        let _ = std::fs::remove_dir_all(&dir);
1358    }
1359
1360    #[test]
1361    fn returns_none_when_env_path_missing() {
1362        std::env::set_var(
1363            "PYLON_FUNCTIONS_RUNTIME",
1364            "/tmp/definitely-does-not-exist-42.ts",
1365        );
1366        // May still find something in CWD (dev path), so we only assert the env
1367        // path isn't what gets returned.
1368        let found = find_functions_runtime();
1369        std::env::remove_var("PYLON_FUNCTIONS_RUNTIME");
1370        assert_ne!(
1371            found.as_deref(),
1372            Some("/tmp/definitely-does-not-exist-42.ts")
1373        );
1374    }
1375}
1376
1377// ---------------------------------------------------------------------------
1378// TxStore — DataStore backed by a held transaction connection
1379// ---------------------------------------------------------------------------
1380
1381/// A `DataStore` that executes against a pre-held SQLite connection
1382/// for the duration of a single mutation handler.
1383///
1384/// # Safety contract
1385///
1386/// `rusqlite::Connection` is `Send` but not `Sync` (it uses `RefCell`s
1387/// internally for statement caching). The `DataStore` trait requires
1388/// `Send + Sync`, but `&'a Connection` is neither.
1389///
1390/// We hand-implement both via `unsafe impl` because:
1391///
1392/// 1. **Construction.** `TxStore::new` is only ever called by
1393///    `FnOpsImpl::call` for mutations, after acquiring the runtime's
1394///    write lock. The `&Connection` originates from a `MutexGuard`
1395///    that the constructing thread holds.
1396///
1397/// 2. **Lifetime.** The `'a` lifetime ties the `TxStore` to that guard.
1398///    The compiler enforces that the `TxStore` cannot outlive the held
1399///    lock; it must be dropped before the guard is.
1400///
1401/// 3. **Single-threaded use.** `FnRunner::call()` runs the handler
1402///    synchronously on the calling thread and never spawns threads
1403///    holding a reference to the `TxStore`. The `Send + Sync` bounds
1404///    on the `DataStore` trait are satisfied vacuously — no thread
1405///    other than the caller ever sees this `TxStore`.
1406///
1407/// 4. **No interior aliasing.** All `&Connection` calls go through
1408///    `Runtime::*_with_conn` methods which take `&Connection`, never
1409///    keeping the reference alive across an `await` point (this is
1410///    sync code, no awaits).
1411///
1412/// Future work: refactor `Runtime`'s `write_conn` to be
1413/// `Arc<Mutex<Connection>>` so TxStore can hold an `Arc<Mutex<...>>`,
1414/// eliminating the unsafe impl entirely.
1415pub struct TxStore<'a> {
1416    runtime: &'a Runtime,
1417    conn: &'a rusqlite::Connection,
1418    /// Pending change events to broadcast after the outer transaction
1419    /// commits. Buffered here rather than pushed to ChangeLog + notifier
1420    /// immediately so a rollback doesn't emit events for writes that
1421    /// didn't actually land.
1422    pending: std::cell::RefCell<Vec<pylon_sync::ChangeEvent>>,
1423}
1424
1425impl<'a> TxStore<'a> {
1426    pub fn new(runtime: &'a Runtime, conn: &'a rusqlite::Connection) -> Self {
1427        Self {
1428            runtime,
1429            conn,
1430            pending: std::cell::RefCell::new(Vec::new()),
1431        }
1432    }
1433
1434    /// Drain the pending-events buffer. Called after COMMIT succeeds;
1435    /// the caller is responsible for appending each event to the
1436    /// ChangeLog and broadcasting via the notifier. On rollback the
1437    /// caller just drops the buffer without calling this.
1438    pub fn take_pending(&self) -> Vec<pylon_sync::ChangeEvent> {
1439        std::mem::take(&mut *self.pending.borrow_mut())
1440    }
1441
1442    fn record(
1443        &self,
1444        entity: &str,
1445        row_id: &str,
1446        kind: pylon_sync::ChangeKind,
1447        data: Option<&serde_json::Value>,
1448    ) {
1449        self.pending.borrow_mut().push(pylon_sync::ChangeEvent {
1450            seq: 0, // assigned by ChangeLog::append after commit
1451            entity: entity.to_string(),
1452            row_id: row_id.to_string(),
1453            kind,
1454            data: data.cloned(),
1455            timestamp: String::new(),
1456        });
1457    }
1458}
1459
1460// SAFETY: see the contract on TxStore above.
1461unsafe impl<'a> Sync for TxStore<'a> {}
1462unsafe impl<'a> Send for TxStore<'a> {}
1463
1464impl<'a> DataStore for TxStore<'a> {
1465    fn manifest(&self) -> &pylon_kernel::AppManifest {
1466        self.runtime.manifest()
1467    }
1468
1469    fn insert(&self, entity: &str, data: &serde_json::Value) -> Result<String, DataError> {
1470        let id = self
1471            .runtime
1472            .insert_with_conn(self.conn, entity, data)
1473            .map_err(into_data_error)?;
1474        // Buffer the event. If the outer mutation rolls back, the buffer
1475        // is dropped instead of flushed, so sync subscribers never see a
1476        // row that doesn't exist.
1477        self.record(entity, &id, pylon_sync::ChangeKind::Insert, Some(data));
1478        Ok(id)
1479    }
1480
1481    fn get_by_id(&self, entity: &str, id: &str) -> Result<Option<serde_json::Value>, DataError> {
1482        self.runtime
1483            .get_by_id_with_conn(self.conn, entity, id)
1484            .map_err(into_data_error)
1485    }
1486
1487    fn list(&self, entity: &str) -> Result<Vec<serde_json::Value>, DataError> {
1488        self.runtime
1489            .list_with_conn(self.conn, entity)
1490            .map_err(into_data_error)
1491    }
1492
1493    fn list_after(
1494        &self,
1495        entity: &str,
1496        after: Option<&str>,
1497        limit: usize,
1498    ) -> Result<Vec<serde_json::Value>, DataError> {
1499        self.runtime
1500            .list_after_with_conn(self.conn, entity, after, limit)
1501            .map_err(into_data_error)
1502    }
1503
1504    fn update(&self, entity: &str, id: &str, data: &serde_json::Value) -> Result<bool, DataError> {
1505        let updated = self
1506            .runtime
1507            .update_with_conn(self.conn, entity, id, data)
1508            .map_err(into_data_error)?;
1509        if updated {
1510            self.record(entity, id, pylon_sync::ChangeKind::Update, Some(data));
1511        }
1512        Ok(updated)
1513    }
1514
1515    fn delete(&self, entity: &str, id: &str) -> Result<bool, DataError> {
1516        let deleted = self
1517            .runtime
1518            .delete_with_conn(self.conn, entity, id)
1519            .map_err(into_data_error)?;
1520        if deleted {
1521            self.record(entity, id, pylon_sync::ChangeKind::Delete, None);
1522        }
1523        Ok(deleted)
1524    }
1525
1526    fn lookup(
1527        &self,
1528        entity: &str,
1529        field: &str,
1530        value: &str,
1531    ) -> Result<Option<serde_json::Value>, DataError> {
1532        self.runtime
1533            .lookup_with_conn(self.conn, entity, field, value)
1534            .map_err(into_data_error)
1535    }
1536
1537    fn link(
1538        &self,
1539        entity: &str,
1540        id: &str,
1541        relation: &str,
1542        target_id: &str,
1543    ) -> Result<bool, DataError> {
1544        self.runtime
1545            .link_with_conn(self.conn, entity, id, relation, target_id)
1546            .map_err(into_data_error)
1547    }
1548
1549    fn unlink(&self, entity: &str, id: &str, relation: &str) -> Result<bool, DataError> {
1550        self.runtime
1551            .unlink_with_conn(self.conn, entity, id, relation)
1552            .map_err(into_data_error)
1553    }
1554
1555    fn query_filtered(
1556        &self,
1557        entity: &str,
1558        filter: &serde_json::Value,
1559    ) -> Result<Vec<serde_json::Value>, DataError> {
1560        self.runtime
1561            .query_filtered_with_conn(self.conn, entity, filter)
1562            .map_err(into_data_error)
1563    }
1564
1565    fn query_graph(&self, query: &serde_json::Value) -> Result<serde_json::Value, DataError> {
1566        self.runtime
1567            .query_graph_with_conn(self.conn, query)
1568            .map_err(into_data_error)
1569    }
1570
1571    fn aggregate(
1572        &self,
1573        entity: &str,
1574        spec: &serde_json::Value,
1575    ) -> Result<serde_json::Value, DataError> {
1576        // Aggregation inside a transaction uses the same runtime method.
1577        // The lookups do their own read-lock, which is fine since aggregate
1578        // is read-only.
1579        Runtime::aggregate(self.runtime, entity, spec).map_err(into_data_error)
1580    }
1581
1582    fn transact(
1583        &self,
1584        _ops: &[serde_json::Value],
1585    ) -> Result<(bool, Vec<serde_json::Value>), DataError> {
1586        // Nested transactions aren't supported from within a mutation handler.
1587        // The mutation handler IS the transaction.
1588        Err(DataError {
1589            code: "NESTED_TRANSACTION".into(),
1590            message: "ctx.db.transact() is not allowed inside a mutation handler (the handler itself is transactional)".into(),
1591        })
1592    }
1593
1594    fn search(
1595        &self,
1596        entity: &str,
1597        query: &serde_json::Value,
1598    ) -> Result<serde_json::Value, DataError> {
1599        // Search reads against the FTS shadow are read-only; route
1600        // through the runtime's main `search` impl which already
1601        // validates the entity + branches on backend. The held write
1602        // connection is fine for reads (SQLite serializes anyway).
1603        <Runtime as DataStore>::search(self.runtime, entity, query)
1604    }
1605}
1606
1607// ---------------------------------------------------------------------------
1608// PG-transaction buffering wrapper
1609// ---------------------------------------------------------------------------
1610
1611/// `DataStore` wrapper used by the Postgres mutation path. The Postgres
1612/// `PgTxStore` owns the transaction; this wrapper layers the same
1613/// "buffer change events, flush after COMMIT" guarantee that SQLite's
1614/// `TxStore` provides directly. The underlying `inner` ref lives only
1615/// for the duration of `PostgresDataStore::with_transaction`'s closure
1616/// — the lifetime tracks through.
1617struct PgBufferedTxStore<'a> {
1618    inner: &'a dyn DataStore,
1619    pending: std::sync::Mutex<Vec<pylon_sync::ChangeEvent>>,
1620}
1621
1622impl<'a> PgBufferedTxStore<'a> {
1623    fn new(inner: &'a dyn DataStore) -> Self {
1624        Self {
1625            inner,
1626            pending: std::sync::Mutex::new(Vec::new()),
1627        }
1628    }
1629
1630    fn record(
1631        &self,
1632        entity: &str,
1633        row_id: &str,
1634        kind: pylon_sync::ChangeKind,
1635        data: Option<&serde_json::Value>,
1636    ) {
1637        if let Ok(mut p) = self.pending.lock() {
1638            p.push(pylon_sync::ChangeEvent {
1639                seq: 0,
1640                entity: entity.to_string(),
1641                row_id: row_id.to_string(),
1642                kind,
1643                data: data.cloned(),
1644                timestamp: String::new(),
1645            });
1646        }
1647    }
1648
1649    fn take_pending(self) -> Vec<pylon_sync::ChangeEvent> {
1650        self.pending.into_inner().unwrap_or_default()
1651    }
1652}
1653
1654impl<'a> DataStore for PgBufferedTxStore<'a> {
1655    fn manifest(&self) -> &pylon_kernel::AppManifest {
1656        self.inner.manifest()
1657    }
1658
1659    fn insert(&self, entity: &str, data: &serde_json::Value) -> Result<String, DataError> {
1660        let id = self.inner.insert(entity, data)?;
1661        self.record(entity, &id, pylon_sync::ChangeKind::Insert, Some(data));
1662        Ok(id)
1663    }
1664
1665    fn get_by_id(&self, entity: &str, id: &str) -> Result<Option<serde_json::Value>, DataError> {
1666        self.inner.get_by_id(entity, id)
1667    }
1668
1669    fn list(&self, entity: &str) -> Result<Vec<serde_json::Value>, DataError> {
1670        self.inner.list(entity)
1671    }
1672
1673    fn list_after(
1674        &self,
1675        entity: &str,
1676        after: Option<&str>,
1677        limit: usize,
1678    ) -> Result<Vec<serde_json::Value>, DataError> {
1679        self.inner.list_after(entity, after, limit)
1680    }
1681
1682    fn update(&self, entity: &str, id: &str, data: &serde_json::Value) -> Result<bool, DataError> {
1683        let updated = self.inner.update(entity, id, data)?;
1684        if updated {
1685            self.record(entity, id, pylon_sync::ChangeKind::Update, Some(data));
1686        }
1687        Ok(updated)
1688    }
1689
1690    fn delete(&self, entity: &str, id: &str) -> Result<bool, DataError> {
1691        let deleted = self.inner.delete(entity, id)?;
1692        if deleted {
1693            self.record(entity, id, pylon_sync::ChangeKind::Delete, None);
1694        }
1695        Ok(deleted)
1696    }
1697
1698    fn lookup(
1699        &self,
1700        entity: &str,
1701        field: &str,
1702        value: &str,
1703    ) -> Result<Option<serde_json::Value>, DataError> {
1704        self.inner.lookup(entity, field, value)
1705    }
1706
1707    fn link(
1708        &self,
1709        entity: &str,
1710        id: &str,
1711        relation: &str,
1712        target_id: &str,
1713    ) -> Result<bool, DataError> {
1714        let linked = self.inner.link(entity, id, relation, target_id)?;
1715        if linked {
1716            // `link` is implemented as a typed `update` under the hood —
1717            // record an Update so subscribers see the FK-set the same way
1718            // they'd see any other column change.
1719            let data = serde_json::json!({ relation: target_id });
1720            self.record(entity, id, pylon_sync::ChangeKind::Update, Some(&data));
1721        }
1722        Ok(linked)
1723    }
1724
1725    fn unlink(&self, entity: &str, id: &str, relation: &str) -> Result<bool, DataError> {
1726        let unlinked = self.inner.unlink(entity, id, relation)?;
1727        if unlinked {
1728            let data = serde_json::json!({ relation: serde_json::Value::Null });
1729            self.record(entity, id, pylon_sync::ChangeKind::Update, Some(&data));
1730        }
1731        Ok(unlinked)
1732    }
1733
1734    fn query_filtered(
1735        &self,
1736        entity: &str,
1737        filter: &serde_json::Value,
1738    ) -> Result<Vec<serde_json::Value>, DataError> {
1739        self.inner.query_filtered(entity, filter)
1740    }
1741
1742    fn query_graph(&self, query: &serde_json::Value) -> Result<serde_json::Value, DataError> {
1743        self.inner.query_graph(query)
1744    }
1745
1746    fn aggregate(
1747        &self,
1748        entity: &str,
1749        spec: &serde_json::Value,
1750    ) -> Result<serde_json::Value, DataError> {
1751        self.inner.aggregate(entity, spec)
1752    }
1753
1754    fn transact(
1755        &self,
1756        ops: &[serde_json::Value],
1757    ) -> Result<(bool, Vec<serde_json::Value>), DataError> {
1758        // Forward to the inner PgTxStore, which already returns
1759        // NESTED_TRANSACTION. Keeping the forward (instead of erroring
1760        // here) means the wrapper stays a faithful pass-through and any
1761        // future change to the inner policy applies uniformly.
1762        self.inner.transact(ops)
1763    }
1764
1765    fn search(
1766        &self,
1767        entity: &str,
1768        query: &serde_json::Value,
1769    ) -> Result<serde_json::Value, DataError> {
1770        // Forward to inner — `PgTxStore::search` (default impl)
1771        // currently returns NOT_SUPPORTED for in-tx search, which is
1772        // the right answer: PG search uses a separate connection
1773        // pool today and would deadlock on the in-handler tx if we
1774        // tried to fan out from the same client.
1775        self.inner.search(entity, query)
1776    }
1777}
1778
1779// ---------------------------------------------------------------------------
1780// Adapter: FnRunner → FnOps
1781// ---------------------------------------------------------------------------
1782
1783use pylon_functions::protocol::{AuthInfo as FnAuth, FnType};
1784use pylon_functions::registry::{FnDef, FnRegistry};
1785use pylon_functions::runner::{FnCallError, FnRunner};
1786use pylon_functions::trace::FnTrace;
1787
1788/// Adapter that implements [`FnOps`] by delegating to a [`FnRunner`].
1789///
1790/// Holds an `Arc<Runtime>` so function handlers get a [`DataStore`] to
1791/// operate against.
1792pub struct FnOpsImpl {
1793    pub runner: Arc<FnRunner>,
1794    pub registry: Arc<FnRegistry>,
1795    pub runtime: Arc<Runtime>,
1796    /// Per-function rate limiter, keyed on `"<fn_name>::<identity>"`.
1797    /// Limits are uniform; per-fn overrides can be added later via FnDef
1798    /// metadata once the TS define API surfaces them.
1799    pub fn_rate_limiter: Arc<crate::rate_limit::RateLimiter>,
1800    /// Sync change log for broadcasting `ctx.db.insert/update/delete` ops
1801    /// that happen inside a function handler. Without this, mutations via
1802    /// functions silently bypass sync — WS subscribers see nothing until
1803    /// they manually refetch. Flushed post-COMMIT so rollbacks don't emit
1804    /// phantom events.
1805    pub change_log: Arc<pylon_sync::ChangeLog>,
1806    /// Where to broadcast change events after a function mutation commits.
1807    pub notifier: Arc<dyn pylon_router::ChangeNotifier>,
1808    /// Job queue for flushing schedules buffered during a mutation
1809    /// handler. The schedule hook itself owns its own clone for the
1810    /// non-mutation enqueue path; this clone is what the
1811    /// post-COMMIT flush uses.
1812    pub job_queue: Arc<crate::jobs::JobQueue>,
1813}
1814
1815impl FnOpsImpl {
1816    /// Drain a per-mutation schedule buffer and enqueue each entry on
1817    /// the job queue. Called only after COMMIT succeeds — a rolled-back
1818    /// handler's buffer is dropped without flushing.
1819    fn flush_pending_schedules(&self, pending: Vec<PendingSchedule>) {
1820        for sched in pending {
1821            let delay_secs = match (sched.delay_ms, sched.run_at) {
1822                (Some(ms), _) => ms / 1000,
1823                (None, Some(ts)) => {
1824                    let now = std::time::SystemTime::now()
1825                        .duration_since(std::time::UNIX_EPOCH)
1826                        .unwrap_or_default()
1827                        .as_millis() as u64;
1828                    if ts > now {
1829                        (ts - now) / 1000
1830                    } else {
1831                        0
1832                    }
1833                }
1834                _ => 0,
1835            };
1836            if let Err(e) = self.job_queue.try_enqueue_with_options(
1837                &sched.fn_name,
1838                sched.args,
1839                crate::jobs::Priority::Normal,
1840                delay_secs,
1841                3,
1842                "functions",
1843            ) {
1844                // Schedule was already acked OK to the TS handler — the
1845                // mutation has committed. Best we can do now is log
1846                // loudly so an operator notices the dropped enqueue.
1847                tracing::warn!(
1848                    "[functions] post-COMMIT enqueue failed for \"{}\": {e}",
1849                    sched.fn_name
1850                );
1851            }
1852        }
1853    }
1854}
1855
1856impl pylon_router::FnOps for FnOpsImpl {
1857    fn get_fn(&self, name: &str) -> Option<FnDef> {
1858        self.registry.get(name)
1859    }
1860
1861    fn list_fns(&self) -> Vec<FnDef> {
1862        self.registry.list()
1863    }
1864
1865    fn call(
1866        &self,
1867        fn_name: &str,
1868        args: serde_json::Value,
1869        auth: FnAuth,
1870        on_stream: Option<Box<dyn FnMut(&str) + Send>>,
1871        request: Option<pylon_functions::protocol::RequestInfo>,
1872    ) -> Result<(serde_json::Value, FnTrace), FnCallError> {
1873        let def = self.registry.get(fn_name).ok_or_else(|| FnCallError {
1874            code: "FN_NOT_FOUND".into(),
1875            message: format!("Function \"{fn_name}\" is not registered"),
1876        })?;
1877
1878        match def.fn_type {
1879            FnType::Mutation => {
1880                // Postgres backend: route through PostgresDataStore::with_transaction
1881                // so the TS handler runs against a held PG transaction. Reads
1882                // see the handler's own pending writes; an error rolls
1883                // everything back atomically. Change events are buffered in a
1884                // `PgBufferedTxStore` wrapper and flushed only after COMMIT —
1885                // mirrors the SQLite `TxStore` behavior.
1886                if self.runtime.is_postgres() {
1887                    let pg_backend = self.runtime.pg_backend().ok_or_else(|| FnCallError {
1888                        code: "PG_BACKEND_MISSING".into(),
1889                        message:
1890                            "Postgres backend reported is_postgres=true but pg_backend() returned None"
1891                                .into(),
1892                    })?;
1893
1894                    // The closure has to own its own state (Box the stream
1895                    // callback so we can move it inside). Capture
1896                    // `request`/`auth`/`args` by move; they aren't needed
1897                    // again outside the closure.
1898                    let runner = self.runner.clone();
1899                    let fn_type = def.fn_type;
1900                    let fn_name_owned = fn_name.to_string();
1901
1902                    // Push the schedule buffer onto the thread-local for
1903                    // the duration of the handler. Drain after COMMIT
1904                    // succeeds; on rollback the buffer is dropped.
1905                    let sched_guard = ScheduleBufferGuard::enter();
1906                    // Mark "we're in a mutation tx" so the nested-call
1907                    // hook rejects recursive mutation calls instead of
1908                    // deadlocking on the connection mutex.
1909                    let _depth_guard = MutationDepthGuard::enter();
1910
1911                    // Install the CRDT hook so PgTxStore projects
1912                    // `ctx.db.X` writes on `crdt: true` entities through
1913                    // PgLoroStore + persists the snapshot in the same
1914                    // tx. Without this, codex-flagged: TS mutation
1915                    // writes on CRDT entities desync from the sidecar.
1916                    let crdt_hook: std::sync::Arc<dyn pylon_storage::pg_tx_store::PgCrdtHook> =
1917                        std::sync::Arc::new(crate::pg_loro_store::PgCrdtHookImpl {
1918                            crdt: std::sync::Arc::clone(&pg_backend.crdt),
1919                            manifest: std::sync::Arc::new(self.runtime.manifest().clone()),
1920                        });
1921
1922                    let pg = &pg_backend.store;
1923                    let tx_result: Result<
1924                        (serde_json::Value, FnTrace, Vec<pylon_sync::ChangeEvent>),
1925                        FnCallError,
1926                    > = pg.with_transaction_crdt(crdt_hook, move |inner_store: &dyn DataStore| {
1927                        let buffered = PgBufferedTxStore::new(inner_store);
1928                        let (value, trace) = runner.call(
1929                            &buffered,
1930                            &fn_name_owned,
1931                            fn_type,
1932                            args,
1933                            auth,
1934                            on_stream,
1935                            request,
1936                        )?;
1937                        Ok((value, trace, buffered.take_pending()))
1938                    });
1939
1940                    return match tx_result {
1941                        Ok((value, trace, pending)) => {
1942                            // Mirror the SQLite path: append to the change
1943                            // log first (so /api/sync/pull tail callers see
1944                            // it), then notify WS/SSE subscribers.
1945                            for ev in pending {
1946                                let seq = self.change_log.append(
1947                                    &ev.entity,
1948                                    &ev.row_id,
1949                                    ev.kind.clone(),
1950                                    ev.data.clone(),
1951                                );
1952                                let event = pylon_sync::ChangeEvent { seq, ..ev };
1953                                self.notifier.notify(&event);
1954                            }
1955                            // Flush scheduled jobs after the commit lands.
1956                            // On rollback the early `Err(e)` arm below
1957                            // skips this and the buffer is dropped.
1958                            self.flush_pending_schedules(sched_guard.take());
1959                            drop(sched_guard);
1960                            Ok((value, trace))
1961                        }
1962                        Err(e) => {
1963                            drop(sched_guard);
1964                            Err(e)
1965                        }
1966                    };
1967                }
1968                // Hold the write connection for the entire handler duration.
1969                // This keeps the BEGIN/COMMIT span free of interleaving from
1970                // other writers (who would otherwise become part of the
1971                // transaction because SQLite tracks it on the connection).
1972                //
1973                // Inside the handler, every `ctx.db` call routes through
1974                // TxStore, which uses this same held connection — so no
1975                // re-locking, no deadlock, no interleaving.
1976                let conn_guard = self.runtime.lock_conn_pub().map_err(|e| FnCallError {
1977                    code: e.code,
1978                    message: e.message,
1979                })?;
1980
1981                if let Err(e) = conn_guard.execute("BEGIN", []) {
1982                    return Err(FnCallError {
1983                        code: "BEGIN_FAILED".into(),
1984                        message: format!("Failed to start transaction: {e}"),
1985                    });
1986                }
1987
1988                // Same schedule buffering as the PG path — `runAfter`
1989                // calls inside this handler defer until COMMIT succeeds.
1990                let sched_guard = ScheduleBufferGuard::enter();
1991                // Same nested-mutation deadlock guard as the PG path.
1992                let _depth_guard = MutationDepthGuard::enter();
1993
1994                let tx_store = TxStore::new(&self.runtime, &conn_guard);
1995                let result = self.runner.call(
1996                    &tx_store,
1997                    fn_name,
1998                    def.fn_type,
1999                    args,
2000                    auth,
2001                    on_stream,
2002                    request,
2003                );
2004
2005                // Surface commit/rollback errors. A swallowed COMMIT failure
2006                // is the worst possible outcome: the caller sees success but
2007                // the data isn't durable. A swallowed ROLLBACK failure leaves
2008                // the connection in an unknown txn state for the next caller.
2009                let result = match result {
2010                    Ok(value) => match conn_guard.execute("COMMIT", []) {
2011                        Ok(_) => {
2012                            // Flush buffered change events NOW — after the
2013                            // commit durably lands but before we return
2014                            // success. Ordering matters: append to the log
2015                            // first (so /api/sync/pull callers that race
2016                            // with this broadcast see the row in the tail),
2017                            // then notify WS/SSE subscribers. `seq` on each
2018                            // pending event starts at 0; append assigns
2019                            // the real seq.
2020                            for ev in tx_store.take_pending() {
2021                                let seq = self.change_log.append(
2022                                    &ev.entity,
2023                                    &ev.row_id,
2024                                    ev.kind.clone(),
2025                                    ev.data.clone(),
2026                                );
2027                                let event = pylon_sync::ChangeEvent { seq, ..ev };
2028                                self.notifier.notify(&event);
2029                            }
2030                            // Same flush as the PG path — durable commit,
2031                            // then flush schedules. Drop the guard
2032                            // explicitly so the thread-local clears
2033                            // before the result returns.
2034                            self.flush_pending_schedules(sched_guard.take());
2035                            drop(sched_guard);
2036                            Ok(value)
2037                        }
2038                        Err(commit_err) => {
2039                            // Best-effort cleanup. If ROLLBACK also fails the
2040                            // connection is in a bad state — at minimum the
2041                            // operator sees both failures in the log.
2042                            if let Err(rollback_err) = conn_guard.execute("ROLLBACK", []) {
2043                                tracing::warn!(
2044                                    "[functions] ROLLBACK after COMMIT failure also failed: {rollback_err}"
2045                                );
2046                            }
2047                            Err(FnCallError {
2048                                code: "COMMIT_FAILED".into(),
2049                                message: format!(
2050                                    "Function \"{fn_name}\" succeeded but COMMIT failed: {commit_err}"
2051                                ),
2052                            })
2053                        }
2054                    },
2055                    Err(handler_err) => {
2056                        if let Err(rollback_err) = conn_guard.execute("ROLLBACK", []) {
2057                            // Don't shadow the handler error — log the
2058                            // rollback failure separately.
2059                            tracing::warn!(
2060                                "[functions] ROLLBACK after handler error failed: {rollback_err}"
2061                            );
2062                        }
2063                        Err(handler_err)
2064                    }
2065                };
2066                // conn_guard drops here, releasing the lock.
2067                result
2068            }
2069            _ => self.runner.call(
2070                &*self.runtime,
2071                fn_name,
2072                def.fn_type,
2073                args,
2074                auth,
2075                on_stream,
2076                request,
2077            ),
2078        }
2079    }
2080
2081    fn recent_traces(&self, limit: usize) -> Vec<FnTrace> {
2082        self.runner.trace_log.recent(limit)
2083    }
2084
2085    fn check_rate_limit(&self, fn_name: &str, identity: &str) -> Result<(), u64> {
2086        let key = format!("{fn_name}::{identity}");
2087        self.fn_rate_limiter.check(&key)
2088    }
2089}
2090
2091/// Spawn the Bun function runtime if a `functions/` directory exists.
2092///
2093/// Returns `Some(FnOpsImpl)` if successful, `None` if no functions directory
2094/// or if Bun is not installed. Errors during startup print to stderr and
2095/// return `None` to keep the server running.
2096/// Resolve the path to the TypeScript function runtime script.
2097///
2098/// Searches in order:
2099/// 1. `$PYLON_FUNCTIONS_RUNTIME` environment variable (if set and file exists)
2100/// 2. `./node_modules/@pylon/functions/src/runtime.ts` (npm-installed)
2101/// 3. `./node_modules/@pylon/functions/dist/runtime.js` (built)
2102/// 4. `~/.pylon/runtime.ts` (user install)
2103/// 5. `packages/functions/src/runtime.ts` (dev monorepo)
2104///
2105/// Returns `None` if none exist.
2106pub fn find_functions_runtime() -> Option<String> {
2107    if let Ok(env_path) = std::env::var("PYLON_FUNCTIONS_RUNTIME") {
2108        if std::path::Path::new(&env_path).exists() {
2109            return Some(env_path);
2110        }
2111    }
2112
2113    // Walk parent directories like Node.js resolution does, so running
2114    // `pylon dev` from an example sub-directory still finds the
2115    // hoisted workspace package at the repo root. Without this, bun/npm
2116    // workspace users see "TypeScript function runtime is not configured"
2117    // and think the server is broken when it's just a CWD issue.
2118    let cwd = std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from("."));
2119    let home = std::env::var("HOME").unwrap_or_else(|_| ".".into());
2120    let relative_candidates = [
2121        "node_modules/@pylonsync/functions/src/runtime.ts",
2122        "node_modules/@pylonsync/functions/dist/runtime.js",
2123        // Legacy name — keeps upgrades from the pre-rename era working.
2124        "node_modules/@pylon/functions/src/runtime.ts",
2125        "node_modules/@pylon/functions/dist/runtime.js",
2126        // Monorepo dev: source tree at the workspace root.
2127        "packages/functions/src/runtime.ts",
2128    ];
2129
2130    let mut dir: Option<&std::path::Path> = Some(cwd.as_path());
2131    while let Some(current) = dir {
2132        for rel in &relative_candidates {
2133            let candidate = current.join(rel);
2134            if candidate.exists() {
2135                return candidate.to_str().map(|s| s.to_string());
2136            }
2137        }
2138        dir = current.parent();
2139    }
2140
2141    // Final fallback: user-wide install under ~/.pylon.
2142    let user_path = format!("{home}/.pylon/runtime.ts");
2143    if std::path::Path::new(&user_path).exists() {
2144        return Some(user_path);
2145    }
2146    None
2147}
2148
2149pub fn try_spawn_functions(
2150    runtime: Arc<Runtime>,
2151    job_queue: Arc<crate::jobs::JobQueue>,
2152    fn_rate_limiter: Arc<crate::rate_limit::RateLimiter>,
2153    change_log: Arc<pylon_sync::ChangeLog>,
2154    notifier: Arc<dyn pylon_router::ChangeNotifier>,
2155) -> Option<Arc<FnOpsImpl>> {
2156    let fn_dir = std::env::var("PYLON_FUNCTIONS_DIR").unwrap_or_else(|_| "functions".into());
2157    if !std::path::Path::new(&fn_dir).exists() {
2158        return None;
2159    }
2160
2161    let runtime_script = match find_functions_runtime() {
2162        Some(p) => p,
2163        None => {
2164            tracing::warn!(
2165                "[functions] No TypeScript runtime script found. TypeScript functions will be unavailable."
2166            );
2167            tracing::warn!(
2168                "[functions] Tried: $PYLON_FUNCTIONS_RUNTIME, node_modules/@pylon/functions/src/runtime.ts, ~/.pylon/runtime.ts, packages/functions/src/runtime.ts"
2169            );
2170            return None;
2171        }
2172    };
2173
2174    let runner = Arc::new(FnRunner::new(1000));
2175
2176    // start() now performs the handshake itself and returns the function
2177    // definitions, so there's no separate handshake step. On any failure the
2178    // child has already been killed.
2179    let defs = match runner.start("bun", &["run", &runtime_script, &fn_dir]) {
2180        Ok(defs) => defs,
2181        Err(e) => {
2182            tracing::warn!("[functions] Failed to start Bun runtime: {e}");
2183            tracing::warn!(
2184                "[functions] Install Bun from https://bun.sh — TypeScript functions will be unavailable."
2185            );
2186            return None;
2187        }
2188    };
2189
2190    // Hold a separate handle on the job queue for registering function
2191    // job handlers below, since the schedule-hook closure consumes its
2192    // own copy.
2193    let job_queue_for_handlers = Arc::clone(&job_queue);
2194
2195    // Wire scheduler requests from functions into the job queue. Use the
2196    // Result-returning variant so a persist failure surfaces as a TS-side
2197    // SCHEDULE_FAILED error instead of `{scheduled:true, id:""}`.
2198    //
2199    // Transaction-bound semantics: when this hook is invoked from inside
2200    // a mutation handler, the per-call `MUTATION_SCHEDULE_BUFFER`
2201    // thread-local is set. Schedules buffer there and drain post-COMMIT
2202    // (so a rolled-back mutation can't leave behind scheduled work).
2203    // Outside a mutation (queries, actions, top-level non-mutation
2204    // jobs), the buffer is `None` and we enqueue immediately — matching
2205    // the historical contract for those code paths.
2206    runner.set_schedule_hook(Box::new(move |fn_name, args, delay_ms, run_at| {
2207        // Check the thread-local first. If we're inside a mutation, the
2208        // buffer is `Some` and we defer.
2209        let buffered = MUTATION_SCHEDULE_BUFFER.with(|cell| {
2210            let slot = cell.borrow();
2211            slot.as_ref()
2212                .map(|b| {
2213                    b.borrow_mut().push(PendingSchedule {
2214                        fn_name: fn_name.to_string(),
2215                        args: args.clone(),
2216                        delay_ms,
2217                        run_at,
2218                    });
2219                })
2220                .is_some()
2221        });
2222        if buffered {
2223            // No real job-id yet — the actual enqueue happens after
2224            // COMMIT. Returning a synthetic id keeps the TS contract
2225            // (`{scheduled:true,id:string}`) intact; a mutation that
2226            // rolls back will discard the buffer and the id was never
2227            // observable to anyone outside the handler anyway.
2228            return Ok(format!("pending:{fn_name}"));
2229        }
2230
2231        let delay_secs = match (delay_ms, run_at) {
2232            (Some(ms), _) => ms / 1000,
2233            (None, Some(ts)) => {
2234                let now = std::time::SystemTime::now()
2235                    .duration_since(std::time::UNIX_EPOCH)
2236                    .unwrap_or_default()
2237                    .as_millis() as u64;
2238                if ts > now {
2239                    (ts - now) / 1000
2240                } else {
2241                    0
2242                }
2243            }
2244            _ => 0,
2245        };
2246        job_queue.try_enqueue_with_options(
2247            fn_name,
2248            args,
2249            crate::jobs::Priority::Normal,
2250            delay_secs,
2251            3,
2252            "functions",
2253        )
2254    }));
2255
2256    let registry = Arc::new(FnRegistry::new());
2257    let count = defs.len();
2258    registry.replace_all(defs);
2259    tracing::warn!("[functions] Loaded {count} function(s) from {fn_dir}");
2260
2261    let ops = Arc::new(FnOpsImpl {
2262        runner,
2263        registry,
2264        runtime,
2265        fn_rate_limiter,
2266        change_log,
2267        notifier,
2268        job_queue: Arc::clone(&job_queue_for_handlers),
2269    });
2270
2271    install_nested_call_hook(&ops);
2272    register_function_job_handlers(&ops, &job_queue_for_handlers);
2273    spawn_runtime_supervisor(Arc::clone(&ops));
2274    Some(ops)
2275}
2276
2277/// Bridge scheduled function calls (via `ctx.scheduler.runAfter` or
2278/// `runAt`) to the function runner. Without this, the schedule hook
2279/// enqueues a job whose `name` is the function name — but no handler
2280/// is registered for it, so the worker fails with "No handler
2281/// registered" and the scheduled callback never runs.
2282///
2283/// Registers one handler per loaded function. Each handler invokes
2284/// `FnOpsImpl::call` with a system auth context (no user_id, not
2285/// admin) so the called function runs with the same privileges a
2286/// trusted-server-side caller would have. The args come from the
2287/// job payload, which the schedule hook copies verbatim from the
2288/// `runAfter(ms, fn, args)` invocation.
2289fn register_function_job_handlers(ops: &Arc<FnOpsImpl>, job_queue: &Arc<crate::jobs::JobQueue>) {
2290    use pylon_router::FnOps as _;
2291
2292    let fn_names: Vec<String> = ops.registry.list().into_iter().map(|d| d.name).collect();
2293
2294    for name in fn_names {
2295        let weak = Arc::downgrade(ops);
2296        let fn_name = name.clone();
2297        job_queue.register(
2298            &name,
2299            Arc::new(move |job: &crate::jobs::Job| {
2300                let ops = match weak.upgrade() {
2301                    Some(o) => o,
2302                    None => {
2303                        return crate::jobs::JobResult::Failure(
2304                            "RUNTIME_GONE: function ops dropped".into(),
2305                        )
2306                    }
2307                };
2308                let auth = FnAuth {
2309                    user_id: None,
2310                    is_admin: false,
2311                    tenant_id: None,
2312                };
2313                match ops.call(&fn_name, job.payload.clone(), auth, None, None) {
2314                    Ok(_) => crate::jobs::JobResult::Success,
2315                    Err(e) => crate::jobs::JobResult::Retry(format!("{}: {}", e.code, e.message)),
2316                }
2317            }),
2318        );
2319    }
2320}
2321
2322/// Route nested `RunFn` calls (action → query/mutation) through a
2323/// transactional wrapper so nested mutations get their own BEGIN/COMMIT.
2324///
2325/// Uses a `Weak<FnOpsImpl>` to avoid keeping the ops struct alive forever
2326/// through a cycle (hook stored on FnRunner ← held by FnOpsImpl). When the
2327/// ops struct is dropped the hook becomes a no-op error.
2328fn install_nested_call_hook(ops: &Arc<FnOpsImpl>) {
2329    use pylon_functions::protocol::{AuthInfo, FnType};
2330
2331    let weak = Arc::downgrade(ops);
2332    ops.runner.set_nested_call_hook(Box::new(
2333        move |fn_name: &str,
2334              fn_type: FnType,
2335              args: serde_json::Value,
2336              auth: AuthInfo|
2337              -> Result<serde_json::Value, (String, String)> {
2338            let ops = match weak.upgrade() {
2339                Some(o) => o,
2340                None => {
2341                    return Err((
2342                        "RUNTIME_GONE".into(),
2343                        "pylon runtime is shutting down".into(),
2344                    ))
2345                }
2346            };
2347
2348            match fn_type {
2349                FnType::Mutation => {
2350                    // Reject nested mutations: both backends acquire a
2351                    // single (non-reentrant) connection mutex per
2352                    // mutation, so a TS handler that calls runMutation
2353                    // from inside another mutation would block forever.
2354                    // Surface a clear NESTED_MUTATION error instead of
2355                    // hanging — callers should restructure to call the
2356                    // shared logic as a function (not a separate
2357                    // mutation) or call from an action.
2358                    if in_mutation_tx() {
2359                        return Err((
2360                            "NESTED_MUTATION".into(),
2361                            format!(
2362                                "ctx.runMutation(\"{fn_name}\") is not allowed from inside \
2363                                 another mutation handler — the mutation handler IS the \
2364                                 transaction, and the connection mutex is non-reentrant. \
2365                                 Restructure the shared logic into a regular function (not \
2366                                 a registered mutation), or call from an action handler."
2367                            ),
2368                        ));
2369                    }
2370
2371                    // Postgres backend: route through the PG with_transaction
2372                    // closure, mirroring the top-level mutation path. Without
2373                    // this, action -> ctx.runMutation(...) errors with
2374                    // NOT_SQLITE_BACKEND on PG even though the top-level path
2375                    // works fine.
2376                    if ops.runtime.is_postgres() {
2377                        let pg_backend = ops.runtime.pg_backend().ok_or_else(|| {
2378                            (
2379                                "PG_BACKEND_MISSING".into(),
2380                                "Postgres backend reported is_postgres=true but pg_backend() returned None".into(),
2381                            )
2382                        })?;
2383                        let pg = &pg_backend.store;
2384                        let runner = ops.runner.clone();
2385                        let fn_name_owned = fn_name.to_string();
2386                        let sched_guard = ScheduleBufferGuard::enter();
2387                        let _depth_guard = MutationDepthGuard::enter();
2388                        // Same CRDT hook as the top-level mutation
2389                        // path so action -> runMutation on a crdt:true
2390                        // entity also maintains the sidecar.
2391                        let crdt_hook: std::sync::Arc<
2392                            dyn pylon_storage::pg_tx_store::PgCrdtHook,
2393                        > = std::sync::Arc::new(crate::pg_loro_store::PgCrdtHookImpl {
2394                            crdt: std::sync::Arc::clone(&pg_backend.crdt),
2395                            manifest: std::sync::Arc::new(ops.runtime.manifest().clone()),
2396                        });
2397                        let tx_result: Result<
2398                            (serde_json::Value, Vec<pylon_sync::ChangeEvent>),
2399                            FnCallError,
2400                        > = pg.with_transaction_crdt(crdt_hook, move |inner_store: &dyn DataStore| {
2401                            let buffered = PgBufferedTxStore::new(inner_store);
2402                            let (value, _trace) = runner.call_inner(
2403                                &buffered,
2404                                &fn_name_owned,
2405                                fn_type,
2406                                args,
2407                                auth,
2408                                None,
2409                                None,
2410                            )?;
2411                            Ok((value, buffered.take_pending()))
2412                        });
2413                        return match tx_result {
2414                            Ok((value, pending)) => {
2415                                for ev in pending {
2416                                    let seq = ops.change_log.append(
2417                                        &ev.entity,
2418                                        &ev.row_id,
2419                                        ev.kind.clone(),
2420                                        ev.data.clone(),
2421                                    );
2422                                    let event = pylon_sync::ChangeEvent { seq, ..ev };
2423                                    ops.notifier.notify(&event);
2424                                }
2425                                ops.flush_pending_schedules(sched_guard.take());
2426                                drop(sched_guard);
2427                                Ok(value)
2428                            }
2429                            Err(e) => {
2430                                drop(sched_guard);
2431                                Err((e.code, e.message))
2432                            }
2433                        };
2434                    }
2435
2436                    // Wrap the nested mutation in its own write-conn + BEGIN
2437                    // + COMMIT, matching the top-level mutation contract.
2438                    let conn_guard = ops
2439                        .runtime
2440                        .lock_conn_pub()
2441                        .map_err(|e| (e.code, e.message))?;
2442                    if let Err(e) = conn_guard.execute("BEGIN", []) {
2443                        return Err(("BEGIN_FAILED".into(), e.to_string()));
2444                    }
2445                    let sched_guard = ScheduleBufferGuard::enter();
2446                    let _depth_guard = MutationDepthGuard::enter();
2447                    let tx_store = TxStore::new(&ops.runtime, &conn_guard);
2448                    // Re-enter protocol without acquiring io_lock — we're
2449                    // already inside the outer call_inner which holds it.
2450                    // Nested calls never get HTTP request metadata — that's
2451                    // only meaningful for the top-level webhook invocation.
2452                    let result = ops
2453                        .runner
2454                        .call_inner(&tx_store, fn_name, fn_type, args, auth, None, None);
2455                    match result {
2456                        Ok((value, _trace)) => {
2457                            if let Err(e) = conn_guard.execute("COMMIT", []) {
2458                                let _ = conn_guard.execute("ROLLBACK", []);
2459                                return Err(("COMMIT_FAILED".into(), e.to_string()));
2460                            }
2461                            // Flush change events after COMMIT so nested
2462                            // mutations (action → runMutation(...)) broadcast
2463                            // the same way top-level mutations do. Without
2464                            // this, every write an action emits is invisible
2465                            // to sync subscribers until the NEXT top-level
2466                            // mutation lands — streaming UIs stay empty.
2467                            for ev in tx_store.take_pending() {
2468                                let seq = ops.change_log.append(
2469                                    &ev.entity,
2470                                    &ev.row_id,
2471                                    ev.kind.clone(),
2472                                    ev.data.clone(),
2473                                );
2474                                let event = pylon_sync::ChangeEvent { seq, ..ev };
2475                                ops.notifier.notify(&event);
2476                            }
2477                            ops.flush_pending_schedules(sched_guard.take());
2478                            drop(sched_guard);
2479                            Ok(value)
2480                        }
2481                        Err(e) => {
2482                            let _ = conn_guard.execute("ROLLBACK", []);
2483                            drop(sched_guard);
2484                            Err((e.code, e.message))
2485                        }
2486                    }
2487                }
2488                _ => {
2489                    // Queries + actions: no transaction wrap needed. Just
2490                    // re-enter protocol via the same store (runtime).
2491                    // Nested: no HTTP request propagated (see above).
2492                    let result = ops.runner.call_inner(
2493                        &*ops.runtime,
2494                        fn_name,
2495                        fn_type,
2496                        args,
2497                        auth,
2498                        None,
2499                        None,
2500                    );
2501                    result.map(|(v, _)| v).map_err(|e| (e.code, e.message))
2502                }
2503            }
2504        },
2505    ));
2506}
2507
2508/// Background watchdog that restarts the Bun runtime if it dies (crashed,
2509/// killed by the call timeout path, OOM, etc.). Exponential backoff: 1s, 2s,
2510/// 4s, ... capped at 30s. Resets to 1s after a successful respawn.
2511///
2512/// We don't try to "give up" — if Bun keeps crashing the supervisor keeps
2513/// trying with the capped delay. The operator sees repeated WARN logs and
2514/// can investigate. Better than silently leaving functions disabled forever.
2515fn spawn_runtime_supervisor(ops: Arc<FnOpsImpl>) {
2516    use std::time::Duration;
2517
2518    std::thread::Builder::new()
2519        .name("pylon-fn-supervisor".into())
2520        .spawn(move || {
2521            let mut backoff = Duration::from_secs(1);
2522            let max_backoff = Duration::from_secs(30);
2523            loop {
2524                std::thread::sleep(Duration::from_secs(2));
2525                if ops.runner.is_alive() {
2526                    backoff = Duration::from_secs(1);
2527                    continue;
2528                }
2529                tracing::warn!(
2530                    "[functions] Bun runtime is not alive — respawning after {:?}",
2531                    backoff
2532                );
2533                std::thread::sleep(backoff);
2534                match ops.runner.respawn() {
2535                    Ok(defs) => {
2536                        let count = defs.len();
2537                        // Replace, not merge — deleted functions must stop
2538                        // being callable. register_all() alone leaves stale
2539                        // entries from the previous process generation.
2540                        ops.registry.replace_all(defs);
2541                        tracing::warn!("[functions] Respawned Bun runtime ({count} fn(s))");
2542                        backoff = Duration::from_secs(1);
2543                    }
2544                    Err(e) => {
2545                        tracing::warn!("[functions] Respawn failed: {e}");
2546                        // Persistent Bun-runtime failures are the kind of
2547                        // operator signal that belongs in error telemetry
2548                        // too. Include enough context to triage repeated
2549                        // events: current backoff (so operators can see
2550                        // how long failures have been compounding) and the
2551                        // component name.
2552                        let backoff_str = format!("{}", backoff.as_secs());
2553                        pylon_observability::report_error(&pylon_observability::ErrorEvent {
2554                            level: pylon_observability::ErrorLevel::Error,
2555                            code: "FN_RESPAWN_FAILED",
2556                            message: &e,
2557                            context: &[
2558                                ("component", "bun-runtime-supervisor"),
2559                                ("backoff_secs", &backoff_str),
2560                            ],
2561                        });
2562                        backoff = (backoff * 2).min(max_backoff);
2563                    }
2564                }
2565            }
2566        })
2567        .expect("failed to spawn function runtime supervisor");
2568}