Skip to main content

pylon_runtime/
datastore.rs

1//! Implements the platform-agnostic [`DataStore`] trait for [`Runtime`].
2//!
3//! This bridges the concrete SQLite-backed Runtime to the abstract trait
4//! used by the router crate, enabling the same routing logic to run on
5//! self-hosted servers and Cloudflare Workers alike.
6
7use pylon_http::{DataError, DataStore};
8
9use crate::Runtime;
10
11// ---------------------------------------------------------------------------
12// DataStore → Runtime bridge
13// ---------------------------------------------------------------------------
14
15impl DataStore for Runtime {
16    fn manifest(&self) -> &pylon_kernel::AppManifest {
17        Runtime::manifest(self)
18    }
19
20    fn insert(&self, entity: &str, data: &serde_json::Value) -> Result<String, DataError> {
21        Runtime::insert(self, entity, data).map_err(into_data_error)
22    }
23
24    fn get_by_id(&self, entity: &str, id: &str) -> Result<Option<serde_json::Value>, DataError> {
25        Runtime::get_by_id(self, entity, id).map_err(into_data_error)
26    }
27
28    fn list(&self, entity: &str) -> Result<Vec<serde_json::Value>, DataError> {
29        Runtime::list(self, entity).map_err(into_data_error)
30    }
31
32    fn list_after(
33        &self,
34        entity: &str,
35        after: Option<&str>,
36        limit: usize,
37    ) -> Result<Vec<serde_json::Value>, DataError> {
38        Runtime::list_after(self, entity, after, limit).map_err(into_data_error)
39    }
40
41    fn update(&self, entity: &str, id: &str, data: &serde_json::Value) -> Result<bool, DataError> {
42        Runtime::update(self, entity, id, data).map_err(into_data_error)
43    }
44
45    fn delete(&self, entity: &str, id: &str) -> Result<bool, DataError> {
46        Runtime::delete(self, entity, id).map_err(into_data_error)
47    }
48
49    fn lookup(
50        &self,
51        entity: &str,
52        field: &str,
53        value: &str,
54    ) -> Result<Option<serde_json::Value>, DataError> {
55        Runtime::lookup(self, entity, field, value).map_err(into_data_error)
56    }
57
58    fn link(
59        &self,
60        entity: &str,
61        id: &str,
62        relation: &str,
63        target_id: &str,
64    ) -> Result<bool, DataError> {
65        Runtime::link(self, entity, id, relation, target_id).map_err(into_data_error)
66    }
67
68    fn unlink(&self, entity: &str, id: &str, relation: &str) -> Result<bool, DataError> {
69        Runtime::unlink(self, entity, id, relation).map_err(into_data_error)
70    }
71
72    fn query_filtered(
73        &self,
74        entity: &str,
75        filter: &serde_json::Value,
76    ) -> Result<Vec<serde_json::Value>, DataError> {
77        Runtime::query_filtered(self, entity, filter).map_err(into_data_error)
78    }
79
80    fn query_graph(&self, query: &serde_json::Value) -> Result<serde_json::Value, DataError> {
81        Runtime::query_graph(self, query).map_err(into_data_error)
82    }
83
84    fn aggregate(
85        &self,
86        entity: &str,
87        spec: &serde_json::Value,
88    ) -> Result<serde_json::Value, DataError> {
89        Runtime::aggregate(self, entity, spec).map_err(into_data_error)
90    }
91
92    fn transact(
93        &self,
94        ops: &[serde_json::Value],
95    ) -> Result<(bool, Vec<serde_json::Value>), DataError> {
96        let conn = self.lock_conn_pub().map_err(into_data_error)?;
97        let _ = conn.execute("BEGIN", []);
98        let mut results: Vec<serde_json::Value> = Vec::new();
99        let mut rollback = false;
100
101        for op in ops {
102            let op_type = op.get("op").and_then(|v| v.as_str()).unwrap_or("");
103            let entity = op.get("entity").and_then(|v| v.as_str()).unwrap_or("");
104
105            match op_type {
106                "insert" => {
107                    let data = op.get("data").cloned().unwrap_or(serde_json::json!({}));
108                    match self.insert_with_conn(&conn, entity, &data) {
109                        Ok(id) => {
110                            results.push(serde_json::json!({"op": "insert", "id": id}));
111                        }
112                        Err(e) => {
113                            results.push(serde_json::json!({"op": "insert", "error": e.message}));
114                            rollback = true;
115                            break;
116                        }
117                    }
118                }
119                "update" => {
120                    let id = op.get("id").and_then(|v| v.as_str()).unwrap_or("");
121                    let data = op.get("data").cloned().unwrap_or(serde_json::json!({}));
122                    match self.update_with_conn(&conn, entity, id, &data) {
123                        Ok(_) => {
124                            results.push(serde_json::json!({"op": "update", "id": id}));
125                        }
126                        Err(e) => {
127                            results.push(serde_json::json!({"op": "update", "error": e.message}));
128                            rollback = true;
129                            break;
130                        }
131                    }
132                }
133                "delete" => {
134                    let id = op.get("id").and_then(|v| v.as_str()).unwrap_or("");
135                    match self.delete_with_conn(&conn, entity, id) {
136                        Ok(_) => {
137                            results.push(serde_json::json!({"op": "delete", "id": id}));
138                        }
139                        Err(e) => {
140                            results.push(serde_json::json!({"op": "delete", "error": e.message}));
141                            rollback = true;
142                            break;
143                        }
144                    }
145                }
146                _ => {
147                    results.push(serde_json::json!({"op": op_type, "error": "unknown operation"}));
148                }
149            }
150        }
151
152        if rollback {
153            let _ = conn.execute("ROLLBACK", []);
154        } else {
155            let _ = conn.execute("COMMIT", []);
156        }
157
158        Ok((!rollback, results))
159    }
160
161    /// Bridge the typed `SearchQuery` / `SearchResult` shapes to the
162    /// trait's JSON-in / JSON-out contract. The router passes a JSON
163    /// body; we deserialize, look up the entity's `SearchConfig`, run
164    /// the planner, and re-serialize. Serialization round-tripping
165    /// lets this method live on the DataStore trait without forcing
166    /// pylon-http to depend on pylon-storage.
167    fn search(
168        &self,
169        entity: &str,
170        query: &serde_json::Value,
171    ) -> Result<serde_json::Value, DataError> {
172        let ent = self
173            .manifest()
174            .entities
175            .iter()
176            .find(|e| e.name == entity)
177            .ok_or_else(|| DataError {
178                code: "ENTITY_NOT_FOUND".into(),
179                message: format!("Unknown entity: {entity}"),
180            })?;
181        let cfg = ent.search.as_ref().ok_or_else(|| DataError {
182            code: "SEARCH_NOT_CONFIGURED".into(),
183            message: format!("Entity {entity} has no `search:` config"),
184        })?;
185        let parsed: pylon_storage::search::SearchQuery = serde_json::from_value(query.clone())
186            .map_err(|e| DataError {
187                code: "INVALID_QUERY".into(),
188                message: format!("search query body: {e}"),
189            })?;
190        let conn = self.lock_conn_pub().map_err(into_data_error)?;
191        let result =
192            pylon_storage::search_query::run_search(&conn, entity, cfg, &parsed).map_err(|e| {
193                DataError {
194                    code: e.code,
195                    message: e.message,
196                }
197            })?;
198        serde_json::to_value(&result).map_err(|e| DataError {
199            code: "SEARCH_SERIALIZE_FAILED".into(),
200            message: e.to_string(),
201        })
202    }
203
204    /// Return the binary CRDT snapshot for a row. `Ok(None)` for any
205    /// entity with `crdt: false` (the LWW opt-out) — the router uses
206    /// that to decide whether to ship a binary update over WebSocket
207    /// after the write.
208    fn crdt_snapshot(&self, entity: &str, row_id: &str) -> Result<Option<Vec<u8>>, DataError> {
209        let ent = self
210            .manifest()
211            .entities
212            .iter()
213            .find(|e| e.name == entity)
214            .ok_or_else(|| DataError {
215                code: "ENTITY_NOT_FOUND".into(),
216                message: format!("Unknown entity: {entity}"),
217            })?;
218        if !ent.crdt {
219            return Ok(None);
220        }
221        let conn = self.lock_conn_pub().map_err(into_data_error)?;
222        let snap = self
223            .crdt_store()
224            .snapshot(&conn, entity, row_id)
225            .map_err(|e| DataError {
226                code: "CRDT_SNAPSHOT_FAILED".into(),
227                message: format!("snapshot {entity}/{row_id}: {e}"),
228            })?;
229        Ok(Some(snap))
230    }
231
232    /// Client-pushed Loro update. Imports into the row's LoroDoc,
233    /// re-projects the doc state into the materialized SQLite columns
234    /// (so subsequent reads see the merged content), and returns the
235    /// fresh full-row snapshot for the router to broadcast to other
236    /// clients.
237    ///
238    /// Wrapped in a single SQLite transaction — same crash-safety
239    /// shape as `Runtime::insert/update`. Either the LoroStore +
240    /// SQLite columns both update or neither does.
241    fn crdt_apply_update(
242        &self,
243        entity: &str,
244        row_id: &str,
245        update: &[u8],
246    ) -> Result<Vec<u8>, DataError> {
247        // Find the entity so we can build the projection field list +
248        // confirm CRDT mode is on. Cheap manifest scan; counts are tiny.
249        let ent = self
250            .manifest()
251            .entities
252            .iter()
253            .find(|e| e.name == entity)
254            .ok_or_else(|| DataError {
255                code: "ENTITY_NOT_FOUND".into(),
256                message: format!("Unknown entity: {entity}"),
257            })?
258            .clone();
259        if !ent.crdt {
260            return Err(DataError {
261                code: "NOT_SUPPORTED".into(),
262                message: format!("Entity {entity} has crdt: false; client push requires CRDT mode"),
263            });
264        }
265        let crdt_fields = self.crdt_fields_for(&ent).map_err(into_data_error)?;
266
267        let conn = self.lock_conn_pub().map_err(into_data_error)?;
268        crate::with_write_tx(&conn, || -> Result<Vec<u8>, crate::RuntimeError> {
269            // Apply the update to the LoroDoc + persist the new snapshot
270            // to the sidecar. Returns the projected JSON shape for the
271            // post-merge state.
272            let projected = self
273                .crdt_store()
274                .apply_remote_update(&conn, entity, row_id, &crdt_fields, update)
275                .map_err(|e| crate::RuntimeError {
276                    code: "CRDT_APPLY_FAILED".into(),
277                    message: format!("apply_remote_update {entity}/{row_id}: {e}"),
278                })?;
279
280            // Re-project into the materialized SQLite row so SELECT
281            // queries see the merged content. Build SET clauses from
282            // the projection — every CRDT-managed field gets rewritten.
283            let projection = projected.as_object().ok_or_else(|| crate::RuntimeError {
284                code: "CRDT_PROJECTION_INVALID".into(),
285                message: "projected row was not a JSON object".into(),
286            })?;
287
288            let mut set_clauses = Vec::with_capacity(projection.len());
289            let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
290            let mut idx = 1;
291            for (key, val) in projection {
292                if key == "id" {
293                    continue;
294                }
295                set_clauses.push(format!("{} = ?{idx}", crate::quote_ident(key.as_str())));
296                values.push(crate::json_to_sql(val));
297                idx += 1;
298            }
299            if set_clauses.is_empty() {
300                // No projected fields — happens when the doc has no
301                // top-level keys yet (fresh row from a peer subscribing
302                // before any writes). Skip the UPDATE; row may not exist
303                // in SQLite. Subsequent inserts will materialize it.
304            } else {
305                values.push(Box::new(row_id.to_string()));
306                let sql = format!(
307                    "UPDATE {} SET {} WHERE \"id\" = ?{idx}",
308                    crate::quote_ident(entity),
309                    set_clauses.join(", ")
310                );
311                let params: Vec<&dyn rusqlite::types::ToSql> =
312                    values.iter().map(|v| v.as_ref()).collect();
313                conn.execute(&sql, params.as_slice())
314                    .map_err(|e| crate::RuntimeError {
315                        code: "UPDATE_FAILED".into(),
316                        message: format!("post-merge UPDATE {entity}/{row_id}: {e}"),
317                    })?;
318            }
319
320            // Return the new snapshot for the router to broadcast.
321            let snap = self
322                .crdt_store()
323                .snapshot(&conn, entity, row_id)
324                .map_err(|e| crate::RuntimeError {
325                    code: "CRDT_SNAPSHOT_FAILED".into(),
326                    message: format!("post-merge snapshot {entity}/{row_id}: {e}"),
327                })?;
328            Ok(snap)
329        })
330        .map_err(into_data_error)
331    }
332}
333
334fn into_data_error(e: crate::RuntimeError) -> DataError {
335    DataError {
336        code: e.code,
337        message: e.message,
338    }
339}
340
341// ---------------------------------------------------------------------------
342// ChangeNotifier for WsHub + SseHub
343// ---------------------------------------------------------------------------
344
345use crate::sse::SseHub;
346use crate::ws::WsHub;
347use std::sync::Arc;
348
349/// Bridges WebSocket + SSE hubs to the router's [`ChangeNotifier`] trait.
350pub struct WsSseNotifier {
351    pub ws: Arc<WsHub>,
352    pub sse: Arc<SseHub>,
353}
354
355impl pylon_router::ChangeNotifier for WsSseNotifier {
356    fn notify(&self, event: &pylon_sync::ChangeEvent) {
357        self.ws.broadcast(event);
358        self.sse.broadcast(event);
359    }
360
361    fn notify_presence(&self, json: &str) {
362        self.ws.broadcast_presence(json);
363        self.sse.broadcast_message(json);
364    }
365
366    /// Encode a CRDT broadcast frame (1-byte type + length-prefixed
367    /// entity + length-prefixed row_id + Loro snapshot bytes) and ship
368    /// it to clients SUBSCRIBED to this row. SSE is text-only so it
369    /// gets skipped — clients on the SSE transport stay on the JSON
370    /// change-event path until a future SSE-friendly encoding (base64
371    /// or hex-encoded chunks) lands.
372    ///
373    /// Filtering by subscription instead of broadcasting to every WS
374    /// client matters once more than a handful of rows are in flight:
375    /// a 50-channel app with 100 connected users would otherwise fan
376    /// 100x for every keystroke in a single channel. Now each binary
377    /// frame goes only to the (typically small) set of tabs that asked
378    /// to mirror that specific row.
379    ///
380    /// If no clients are subscribed (empty list) the frame is dropped
381    /// silently — the JSON change event from `notify` already told
382    /// every connected client a write happened, so non-subscribed
383    /// clients can re-fetch via the regular query path if they care.
384    ///
385    /// Authz: the policy check happens at SUBSCRIBE TIME (in
386    /// `start_ws_server`'s SnapshotFetcher closure) — clients on the
387    /// subscriber list have already passed `check_entity_read` for
388    /// the row at that moment. We don't re-check on every broadcast
389    /// because the broadcast hot path runs from the write thread
390    /// without per-client auth context. A consequence: if a client is
391    /// already subscribed and their permissions change mid-session
392    /// (e.g. they're removed from a private channel), they'll keep
393    /// receiving CRDT frames for that row until they disconnect.
394    /// Future work: index subscribers by auth context so the broadcast
395    /// can re-check, or invalidate subscriptions on policy changes.
396    ///
397    /// Frame-encode failure (entity / row_id over the 16-bit length
398    /// header) gets logged and dropped — the row's regular JSON change
399    /// event already shipped via `notify`, so clients still see the
400    /// write happened, they just don't get the binary CRDT delta.
401    fn notify_crdt(&self, entity: &str, row_id: &str, snapshot: &[u8]) {
402        let subscribers = self.ws.subscriptions().subscribers(entity, row_id);
403        if subscribers.is_empty() {
404            return;
405        }
406        match pylon_router::encode_crdt_frame(
407            pylon_router::CRDT_FRAME_SNAPSHOT,
408            entity,
409            row_id,
410            snapshot,
411        ) {
412            Ok(frame) => self.ws.broadcast_binary_to(&subscribers, frame),
413            Err(e) => {
414                tracing::warn!("[crdt] dropping binary frame for {entity}/{row_id}: {e}");
415            }
416        }
417    }
418}
419
420/// Serialize a value to JSON, falling back to `{}` on failure.
421fn to_json<T: serde::Serialize>(val: T) -> serde_json::Value {
422    serde_json::to_value(val).unwrap_or(serde_json::json!({}))
423}
424
425/// Serialize a value to JSON, falling back to `[]` on failure.
426fn to_json_array<T: serde::Serialize>(val: T) -> serde_json::Value {
427    serde_json::to_value(val).unwrap_or(serde_json::json!([]))
428}
429
430// ---------------------------------------------------------------------------
431// Adapter: RoomManager → RoomOps
432// ---------------------------------------------------------------------------
433
434use crate::rooms::RoomManager;
435
436impl pylon_router::RoomOps for RoomManager {
437    fn join(
438        &self,
439        room: &str,
440        user_id: &str,
441        data: Option<serde_json::Value>,
442    ) -> Result<(serde_json::Value, serde_json::Value), DataError> {
443        RoomManager::join(self, room, user_id, data)
444            .map(|(snapshot, join_event)| (to_json(&snapshot), to_json(&join_event)))
445            .map_err(|e| DataError {
446                code: e.code,
447                message: e.message,
448            })
449    }
450
451    fn leave(&self, room: &str, user_id: &str) -> Option<serde_json::Value> {
452        RoomManager::leave(self, room, user_id).map(|event| to_json(&event))
453    }
454
455    fn set_presence(
456        &self,
457        room: &str,
458        user_id: &str,
459        data: serde_json::Value,
460    ) -> Option<serde_json::Value> {
461        RoomManager::set_presence(self, room, user_id, data).map(|event| to_json(&event))
462    }
463
464    fn broadcast(
465        &self,
466        room: &str,
467        sender: Option<&str>,
468        topic: &str,
469        data: serde_json::Value,
470    ) -> Option<serde_json::Value> {
471        RoomManager::broadcast(self, room, sender, topic, data).map(|event| to_json(&event))
472    }
473
474    fn list_rooms(&self) -> Vec<String> {
475        RoomManager::list_rooms(self)
476    }
477
478    fn room_size(&self, name: &str) -> usize {
479        RoomManager::room_size(self, name)
480    }
481
482    fn members(&self, name: &str) -> Vec<serde_json::Value> {
483        RoomManager::members(self, name)
484            .into_iter()
485            .map(|p| to_json(p))
486            .collect()
487    }
488}
489
490// ---------------------------------------------------------------------------
491// Adapter: CachePlugin → CacheOps (newtype wrapper for orphan rule)
492// ---------------------------------------------------------------------------
493
494use pylon_plugin::builtin::cache::CachePlugin;
495
496/// Adapter that routes router-level CRUD hook calls into the PluginRegistry.
497///
498/// The router holds a `&dyn PluginHookOps`; this adapter wraps the runtime's
499/// `Arc<PluginRegistry>` so registered plugins (audit_log, validation,
500/// webhooks, timestamps, slugify, versioning, search) run on every
501/// POST/PATCH/DELETE under `/api/entities/*`. Without this wiring, plugins
502/// only saw the `on_request` hook and never got a chance to observe or
503/// reject data-plane writes — a quiet correctness hole noted in the
504/// pentest review.
505pub struct PluginHooksAdapter(pub Arc<pylon_plugin::PluginRegistry>);
506
507impl pylon_router::PluginHookOps for PluginHooksAdapter {
508    fn before_insert(
509        &self,
510        entity: &str,
511        data: &mut serde_json::Value,
512        auth: &pylon_auth::AuthContext,
513    ) -> Result<(), (u16, String, String)> {
514        self.0
515            .run_before_insert(entity, data, auth)
516            .map_err(|e| (e.status, e.code, e.message))
517    }
518    fn after_insert(
519        &self,
520        entity: &str,
521        id: &str,
522        data: &serde_json::Value,
523        auth: &pylon_auth::AuthContext,
524    ) {
525        self.0.run_after_insert(entity, id, data, auth);
526    }
527    fn before_update(
528        &self,
529        entity: &str,
530        id: &str,
531        data: &mut serde_json::Value,
532        auth: &pylon_auth::AuthContext,
533    ) -> Result<(), (u16, String, String)> {
534        self.0
535            .run_before_update(entity, id, data, auth)
536            .map_err(|e| (e.status, e.code, e.message))
537    }
538    fn after_update(
539        &self,
540        entity: &str,
541        id: &str,
542        data: &serde_json::Value,
543        auth: &pylon_auth::AuthContext,
544    ) {
545        self.0.run_after_update(entity, id, data, auth);
546    }
547    fn before_delete(
548        &self,
549        entity: &str,
550        id: &str,
551        auth: &pylon_auth::AuthContext,
552    ) -> Result<(), (u16, String, String)> {
553        self.0
554            .run_before_delete(entity, id, auth)
555            .map_err(|e| (e.status, e.code, e.message))
556    }
557    fn after_delete(&self, entity: &str, id: &str, auth: &pylon_auth::AuthContext) {
558        self.0.run_after_delete(entity, id, auth);
559    }
560}
561
562pub struct CacheAdapter(pub Arc<CachePlugin>);
563
564impl pylon_router::CacheOps for CacheAdapter {
565    fn handle_command(&self, body: &str) -> (u16, String) {
566        crate::cache_handlers::handle_cache_command(&self.0, body)
567    }
568
569    fn handle_get(&self, key: &str) -> (u16, String) {
570        crate::cache_handlers::handle_cache_get(&self.0, key)
571    }
572
573    fn handle_delete(&self, key: &str) -> (u16, String) {
574        crate::cache_handlers::handle_cache_delete(&self.0, key)
575    }
576}
577
578// ---------------------------------------------------------------------------
579// Adapter: PubSubBroker → PubSubOps (newtype wrapper for orphan rule)
580// ---------------------------------------------------------------------------
581
582use crate::pubsub::PubSubBroker;
583
584pub struct PubSubAdapter(pub Arc<PubSubBroker>);
585
586impl pylon_router::PubSubOps for PubSubAdapter {
587    fn handle_publish(&self, body: &str) -> (u16, String) {
588        crate::cache_handlers::handle_pubsub_publish(&self.0, body)
589    }
590
591    fn handle_channels(&self) -> (u16, String) {
592        crate::cache_handlers::handle_pubsub_channels(&self.0)
593    }
594
595    fn handle_history(&self, channel: &str, url: &str) -> (u16, String) {
596        crate::cache_handlers::handle_pubsub_history(&self.0, channel, url)
597    }
598}
599
600// ---------------------------------------------------------------------------
601// Adapter: JobQueue → JobOps
602// ---------------------------------------------------------------------------
603
604use crate::jobs::{JobQueue, Priority};
605
606impl pylon_router::JobOps for JobQueue {
607    fn enqueue(
608        &self,
609        name: &str,
610        payload: serde_json::Value,
611        priority: &str,
612        delay_secs: u64,
613        max_retries: u32,
614        queue: &str,
615    ) -> String {
616        let pri = Priority::from_str_loose(priority);
617        JobQueue::enqueue_with_options(self, name, payload, pri, delay_secs, max_retries, queue)
618    }
619
620    fn stats(&self) -> serde_json::Value {
621        to_json(JobQueue::stats(self))
622    }
623
624    fn dead_letters(&self) -> serde_json::Value {
625        to_json_array(JobQueue::dead_letters(self))
626    }
627
628    fn retry_dead(&self, id: &str) -> bool {
629        JobQueue::retry_dead(self, id)
630    }
631
632    fn list_jobs(
633        &self,
634        status: Option<&str>,
635        queue: Option<&str>,
636        limit: usize,
637    ) -> serde_json::Value {
638        to_json_array(JobQueue::list_jobs(self, status, queue, limit))
639    }
640
641    fn get_job(&self, id: &str) -> Option<serde_json::Value> {
642        JobQueue::get_job(self, id).map(|j| to_json(j))
643    }
644}
645
646// ---------------------------------------------------------------------------
647// Adapter: Scheduler → SchedulerOps
648// ---------------------------------------------------------------------------
649
650use crate::scheduler::Scheduler;
651
652impl pylon_router::SchedulerOps for Scheduler {
653    fn list_tasks(&self) -> serde_json::Value {
654        to_json_array(Scheduler::list_tasks(self))
655    }
656
657    fn trigger(&self, name: &str) -> bool {
658        Scheduler::trigger(self, name)
659    }
660}
661
662// ---------------------------------------------------------------------------
663// Adapter: WorkflowEngine → WorkflowOps
664// ---------------------------------------------------------------------------
665
666use crate::workflows::WorkflowEngine;
667
668impl pylon_router::WorkflowOps for WorkflowEngine {
669    fn definitions(&self) -> serde_json::Value {
670        to_json_array(WorkflowEngine::definitions(self))
671    }
672
673    fn start(&self, name: &str, input: serde_json::Value) -> Result<String, String> {
674        WorkflowEngine::start(self, name, input)
675    }
676
677    fn list(&self, status_filter: Option<&str>) -> serde_json::Value {
678        // Convert string filter to WorkflowStatus for the engine.
679        let filter = status_filter.and_then(|s| match s {
680            "pending" => Some(crate::workflows::WorkflowStatus::Pending),
681            "running" => Some(crate::workflows::WorkflowStatus::Running),
682            "sleeping" => Some(crate::workflows::WorkflowStatus::Sleeping),
683            "waiting" => Some(crate::workflows::WorkflowStatus::WaitingForEvent),
684            "completed" => Some(crate::workflows::WorkflowStatus::Completed),
685            "failed" => Some(crate::workflows::WorkflowStatus::Failed),
686            "cancelled" => Some(crate::workflows::WorkflowStatus::Cancelled),
687            _ => None,
688        });
689        to_json_array(WorkflowEngine::list(self, filter.as_ref()))
690    }
691
692    fn get(&self, id: &str) -> Option<serde_json::Value> {
693        WorkflowEngine::get(self, id).map(|inst| to_json(inst))
694    }
695
696    fn advance(&self, id: &str) -> Result<String, String> {
697        WorkflowEngine::advance(self, id).map(|status| format!("{:?}", status))
698    }
699
700    fn send_event(&self, id: &str, event: &str, data: serde_json::Value) -> Result<(), String> {
701        WorkflowEngine::send_event(self, id, event, data)
702    }
703
704    fn cancel(&self, id: &str) -> Result<(), String> {
705        WorkflowEngine::cancel(self, id)
706    }
707}
708
709// ---------------------------------------------------------------------------
710// Adapter: FileStorage trait → FileOps
711// ---------------------------------------------------------------------------
712
713use pylon_storage::files::{FileStorage, LocalFileStorage};
714
715/// Adapter that exposes a [`FileStorage`] backend through the router's [`FileOps`].
716pub struct FileOpsAdapter {
717    pub storage: Arc<dyn FileStorage>,
718}
719
720impl FileOpsAdapter {
721    /// Create from environment variables.
722    /// Defaults to local filesystem storage at `./uploads`.
723    pub fn from_env() -> Self {
724        let dir = std::env::var("PYLON_FILES_DIR").unwrap_or_else(|_| "uploads".into());
725        let url_prefix =
726            std::env::var("PYLON_FILES_URL_PREFIX").unwrap_or_else(|_| "/api/files".into());
727        Self {
728            storage: Arc::new(LocalFileStorage::new(&dir, &url_prefix)),
729        }
730    }
731}
732
733impl pylon_router::FileOps for FileOpsAdapter {
734    fn upload(&self, _body: &str) -> (u16, String) {
735        // The self-hosted server short-circuits /api/files/upload BEFORE the
736        // request body is lossily coerced to a String, so binary uploads are
737        // handled there. This fallback exists for non-self-hosted adapters
738        // (e.g., Workers) and for defense in depth; it rejects string bodies
739        // that wouldn't carry binary data correctly.
740        (
741            400,
742            pylon_router::json_error(
743                "UPLOAD_NEEDS_BINARY",
744                "File uploads must use multipart/form-data or raw binary with X-Filename; this platform does not support string-body uploads",
745            ),
746        )
747    }
748
749    fn get_file(&self, id: &str) -> (u16, String) {
750        match self.storage.get(id) {
751            Ok(content) => (200, String::from_utf8_lossy(&content).into_owned()),
752            Err(e) if e.code == "NOT_FOUND" => {
753                (404, pylon_router::json_error("FILE_NOT_FOUND", &e.message))
754            }
755            Err(e) => (400, pylon_router::json_error(&e.code, &e.message)),
756        }
757    }
758}
759
760/// Backwards-compatible alias; old code refers to this name.
761pub type LocalFileOps = FileOpsAdapter;
762
763impl LocalFileOps {
764    /// Default instance backed by the local `uploads/` directory.
765    pub fn new_default() -> Self {
766        Self::from_env()
767    }
768}
769
770// ---------------------------------------------------------------------------
771// Adapter: EmailTransport → EmailSender
772// ---------------------------------------------------------------------------
773
774use pylon_auth::email::{ConsoleTransport, EmailTransport, HttpEmailTransport};
775
776/// Picks an email backend based on environment variables.
777/// Falls back to `ConsoleTransport` (prints to stderr) when no provider is configured.
778pub struct EmailAdapter {
779    transport: Box<dyn EmailTransport>,
780}
781
782impl EmailAdapter {
783    pub fn from_env() -> Self {
784        if let Some(http) = HttpEmailTransport::from_env() {
785            Self {
786                transport: Box::new(http),
787            }
788        } else {
789            Self {
790                transport: Box::new(ConsoleTransport),
791            }
792        }
793    }
794}
795
796impl pylon_router::EmailSender for EmailAdapter {
797    fn send(&self, to: &str, subject: &str, body: &str) -> Result<(), String> {
798        self.transport
799            .send(to, subject, body)
800            .map_err(|e| e.message)
801    }
802}
803
804// ---------------------------------------------------------------------------
805// Adapter: OpenAPI generator
806// ---------------------------------------------------------------------------
807
808pub struct RuntimeOpenApiGenerator<'a> {
809    pub manifest: &'a pylon_kernel::AppManifest,
810}
811
812impl<'a> pylon_router::OpenApiGenerator for RuntimeOpenApiGenerator<'a> {
813    fn generate(&self, base_url: &str) -> String {
814        let spec = crate::openapi::generate_openapi(self.manifest, base_url);
815        serde_json::to_string(&spec).unwrap_or_else(|_| "{}".into())
816    }
817}
818
819// ---------------------------------------------------------------------------
820// Adapter: DynShardRegistry → ShardOps
821// ---------------------------------------------------------------------------
822
823/// Wraps any `Arc<dyn DynShardRegistry>` so the router can dispatch shard
824/// routes without knowing the concrete SimState type.
825pub struct ShardOpsAdapter {
826    pub registry: Arc<dyn pylon_realtime::DynShardRegistry>,
827}
828
829impl pylon_router::ShardOps for ShardOpsAdapter {
830    fn get_shard(&self, id: &str) -> Option<Arc<dyn pylon_realtime::DynShard>> {
831        self.registry.get(id)
832    }
833
834    fn list_shards(&self) -> Vec<String> {
835        self.registry.ids()
836    }
837
838    fn shard_count(&self) -> usize {
839        self.registry.len()
840    }
841}
842
843#[cfg(test)]
844mod find_runtime_tests {
845    use super::*;
846
847    #[test]
848    fn env_override_takes_precedence() {
849        let dir = std::env::temp_dir().join(format!("pylon_rt_{}", std::process::id()));
850        let _ = std::fs::create_dir_all(&dir);
851        let path = dir.join("custom_runtime.ts");
852        std::fs::write(&path, "// test").unwrap();
853
854        std::env::set_var("PYLON_FUNCTIONS_RUNTIME", path.to_str().unwrap());
855        let found = find_functions_runtime();
856        std::env::remove_var("PYLON_FUNCTIONS_RUNTIME");
857
858        assert_eq!(found.as_deref(), path.to_str());
859
860        let _ = std::fs::remove_dir_all(&dir);
861    }
862
863    #[test]
864    fn returns_none_when_env_path_missing() {
865        std::env::set_var(
866            "PYLON_FUNCTIONS_RUNTIME",
867            "/tmp/definitely-does-not-exist-42.ts",
868        );
869        // May still find something in CWD (dev path), so we only assert the env
870        // path isn't what gets returned.
871        let found = find_functions_runtime();
872        std::env::remove_var("PYLON_FUNCTIONS_RUNTIME");
873        assert_ne!(
874            found.as_deref(),
875            Some("/tmp/definitely-does-not-exist-42.ts")
876        );
877    }
878}
879
880// ---------------------------------------------------------------------------
881// TxStore — DataStore backed by a held transaction connection
882// ---------------------------------------------------------------------------
883
884/// A `DataStore` that executes against a pre-held SQLite connection
885/// for the duration of a single mutation handler.
886///
887/// # Safety contract
888///
889/// `rusqlite::Connection` is `Send` but not `Sync` (it uses `RefCell`s
890/// internally for statement caching). The `DataStore` trait requires
891/// `Send + Sync`, but `&'a Connection` is neither.
892///
893/// We hand-implement both via `unsafe impl` because:
894///
895/// 1. **Construction.** `TxStore::new` is only ever called by
896///    `FnOpsImpl::call` for mutations, after acquiring the runtime's
897///    write lock. The `&Connection` originates from a `MutexGuard`
898///    that the constructing thread holds.
899///
900/// 2. **Lifetime.** The `'a` lifetime ties the `TxStore` to that guard.
901///    The compiler enforces that the `TxStore` cannot outlive the held
902///    lock; it must be dropped before the guard is.
903///
904/// 3. **Single-threaded use.** `FnRunner::call()` runs the handler
905///    synchronously on the calling thread and never spawns threads
906///    holding a reference to the `TxStore`. The `Send + Sync` bounds
907///    on the `DataStore` trait are satisfied vacuously — no thread
908///    other than the caller ever sees this `TxStore`.
909///
910/// 4. **No interior aliasing.** All `&Connection` calls go through
911///    `Runtime::*_with_conn` methods which take `&Connection`, never
912///    keeping the reference alive across an `await` point (this is
913///    sync code, no awaits).
914///
915/// Future work: refactor `Runtime`'s `write_conn` to be
916/// `Arc<Mutex<Connection>>` so TxStore can hold an `Arc<Mutex<...>>`,
917/// eliminating the unsafe impl entirely.
918pub struct TxStore<'a> {
919    runtime: &'a Runtime,
920    conn: &'a rusqlite::Connection,
921    /// Pending change events to broadcast after the outer transaction
922    /// commits. Buffered here rather than pushed to ChangeLog + notifier
923    /// immediately so a rollback doesn't emit events for writes that
924    /// didn't actually land.
925    pending: std::cell::RefCell<Vec<pylon_sync::ChangeEvent>>,
926}
927
928impl<'a> TxStore<'a> {
929    pub fn new(runtime: &'a Runtime, conn: &'a rusqlite::Connection) -> Self {
930        Self {
931            runtime,
932            conn,
933            pending: std::cell::RefCell::new(Vec::new()),
934        }
935    }
936
937    /// Drain the pending-events buffer. Called after COMMIT succeeds;
938    /// the caller is responsible for appending each event to the
939    /// ChangeLog and broadcasting via the notifier. On rollback the
940    /// caller just drops the buffer without calling this.
941    pub fn take_pending(&self) -> Vec<pylon_sync::ChangeEvent> {
942        std::mem::take(&mut *self.pending.borrow_mut())
943    }
944
945    fn record(
946        &self,
947        entity: &str,
948        row_id: &str,
949        kind: pylon_sync::ChangeKind,
950        data: Option<&serde_json::Value>,
951    ) {
952        self.pending.borrow_mut().push(pylon_sync::ChangeEvent {
953            seq: 0, // assigned by ChangeLog::append after commit
954            entity: entity.to_string(),
955            row_id: row_id.to_string(),
956            kind,
957            data: data.cloned(),
958            timestamp: String::new(),
959        });
960    }
961}
962
963// SAFETY: see the contract on TxStore above.
964unsafe impl<'a> Sync for TxStore<'a> {}
965unsafe impl<'a> Send for TxStore<'a> {}
966
967impl<'a> DataStore for TxStore<'a> {
968    fn manifest(&self) -> &pylon_kernel::AppManifest {
969        self.runtime.manifest()
970    }
971
972    fn insert(&self, entity: &str, data: &serde_json::Value) -> Result<String, DataError> {
973        let id = self
974            .runtime
975            .insert_with_conn(self.conn, entity, data)
976            .map_err(into_data_error)?;
977        // Buffer the event. If the outer mutation rolls back, the buffer
978        // is dropped instead of flushed, so sync subscribers never see a
979        // row that doesn't exist.
980        self.record(entity, &id, pylon_sync::ChangeKind::Insert, Some(data));
981        Ok(id)
982    }
983
984    fn get_by_id(&self, entity: &str, id: &str) -> Result<Option<serde_json::Value>, DataError> {
985        self.runtime
986            .get_by_id_with_conn(self.conn, entity, id)
987            .map_err(into_data_error)
988    }
989
990    fn list(&self, entity: &str) -> Result<Vec<serde_json::Value>, DataError> {
991        self.runtime
992            .list_with_conn(self.conn, entity)
993            .map_err(into_data_error)
994    }
995
996    fn list_after(
997        &self,
998        entity: &str,
999        after: Option<&str>,
1000        limit: usize,
1001    ) -> Result<Vec<serde_json::Value>, DataError> {
1002        self.runtime
1003            .list_after_with_conn(self.conn, entity, after, limit)
1004            .map_err(into_data_error)
1005    }
1006
1007    fn update(&self, entity: &str, id: &str, data: &serde_json::Value) -> Result<bool, DataError> {
1008        let updated = self
1009            .runtime
1010            .update_with_conn(self.conn, entity, id, data)
1011            .map_err(into_data_error)?;
1012        if updated {
1013            self.record(entity, id, pylon_sync::ChangeKind::Update, Some(data));
1014        }
1015        Ok(updated)
1016    }
1017
1018    fn delete(&self, entity: &str, id: &str) -> Result<bool, DataError> {
1019        let deleted = self
1020            .runtime
1021            .delete_with_conn(self.conn, entity, id)
1022            .map_err(into_data_error)?;
1023        if deleted {
1024            self.record(entity, id, pylon_sync::ChangeKind::Delete, None);
1025        }
1026        Ok(deleted)
1027    }
1028
1029    fn lookup(
1030        &self,
1031        entity: &str,
1032        field: &str,
1033        value: &str,
1034    ) -> Result<Option<serde_json::Value>, DataError> {
1035        self.runtime
1036            .lookup_with_conn(self.conn, entity, field, value)
1037            .map_err(into_data_error)
1038    }
1039
1040    fn link(
1041        &self,
1042        entity: &str,
1043        id: &str,
1044        relation: &str,
1045        target_id: &str,
1046    ) -> Result<bool, DataError> {
1047        self.runtime
1048            .link_with_conn(self.conn, entity, id, relation, target_id)
1049            .map_err(into_data_error)
1050    }
1051
1052    fn unlink(&self, entity: &str, id: &str, relation: &str) -> Result<bool, DataError> {
1053        self.runtime
1054            .unlink_with_conn(self.conn, entity, id, relation)
1055            .map_err(into_data_error)
1056    }
1057
1058    fn query_filtered(
1059        &self,
1060        entity: &str,
1061        filter: &serde_json::Value,
1062    ) -> Result<Vec<serde_json::Value>, DataError> {
1063        self.runtime
1064            .query_filtered_with_conn(self.conn, entity, filter)
1065            .map_err(into_data_error)
1066    }
1067
1068    fn query_graph(&self, query: &serde_json::Value) -> Result<serde_json::Value, DataError> {
1069        self.runtime
1070            .query_graph_with_conn(self.conn, query)
1071            .map_err(into_data_error)
1072    }
1073
1074    fn aggregate(
1075        &self,
1076        entity: &str,
1077        spec: &serde_json::Value,
1078    ) -> Result<serde_json::Value, DataError> {
1079        // Aggregation inside a transaction uses the same runtime method.
1080        // The lookups do their own read-lock, which is fine since aggregate
1081        // is read-only.
1082        Runtime::aggregate(self.runtime, entity, spec).map_err(into_data_error)
1083    }
1084
1085    fn transact(
1086        &self,
1087        _ops: &[serde_json::Value],
1088    ) -> Result<(bool, Vec<serde_json::Value>), DataError> {
1089        // Nested transactions aren't supported from within a mutation handler.
1090        // The mutation handler IS the transaction.
1091        Err(DataError {
1092            code: "NESTED_TRANSACTION".into(),
1093            message: "ctx.db.transact() is not allowed inside a mutation handler (the handler itself is transactional)".into(),
1094        })
1095    }
1096}
1097
1098// ---------------------------------------------------------------------------
1099// Adapter: FnRunner → FnOps
1100// ---------------------------------------------------------------------------
1101
1102use pylon_functions::protocol::{AuthInfo as FnAuth, FnType};
1103use pylon_functions::registry::{FnDef, FnRegistry};
1104use pylon_functions::runner::{FnCallError, FnRunner};
1105use pylon_functions::trace::FnTrace;
1106
1107/// Adapter that implements [`FnOps`] by delegating to a [`FnRunner`].
1108///
1109/// Holds an `Arc<Runtime>` so function handlers get a [`DataStore`] to
1110/// operate against.
1111pub struct FnOpsImpl {
1112    pub runner: Arc<FnRunner>,
1113    pub registry: Arc<FnRegistry>,
1114    pub runtime: Arc<Runtime>,
1115    /// Per-function rate limiter, keyed on `"<fn_name>::<identity>"`.
1116    /// Limits are uniform; per-fn overrides can be added later via FnDef
1117    /// metadata once the TS define API surfaces them.
1118    pub fn_rate_limiter: Arc<crate::rate_limit::RateLimiter>,
1119    /// Sync change log for broadcasting `ctx.db.insert/update/delete` ops
1120    /// that happen inside a function handler. Without this, mutations via
1121    /// functions silently bypass sync — WS subscribers see nothing until
1122    /// they manually refetch. Flushed post-COMMIT so rollbacks don't emit
1123    /// phantom events.
1124    pub change_log: Arc<pylon_sync::ChangeLog>,
1125    /// Where to broadcast change events after a function mutation commits.
1126    pub notifier: Arc<dyn pylon_router::ChangeNotifier>,
1127}
1128
1129impl pylon_router::FnOps for FnOpsImpl {
1130    fn get_fn(&self, name: &str) -> Option<FnDef> {
1131        self.registry.get(name)
1132    }
1133
1134    fn list_fns(&self) -> Vec<FnDef> {
1135        self.registry.list()
1136    }
1137
1138    fn call(
1139        &self,
1140        fn_name: &str,
1141        args: serde_json::Value,
1142        auth: FnAuth,
1143        on_stream: Option<Box<dyn FnMut(&str) + Send>>,
1144        request: Option<pylon_functions::protocol::RequestInfo>,
1145    ) -> Result<(serde_json::Value, FnTrace), FnCallError> {
1146        let def = self.registry.get(fn_name).ok_or_else(|| FnCallError {
1147            code: "FN_NOT_FOUND".into(),
1148            message: format!("Function \"{fn_name}\" is not registered"),
1149        })?;
1150
1151        match def.fn_type {
1152            FnType::Mutation => {
1153                // Hold the write connection for the entire handler duration.
1154                // This keeps the BEGIN/COMMIT span free of interleaving from
1155                // other writers (who would otherwise become part of the
1156                // transaction because SQLite tracks it on the connection).
1157                //
1158                // Inside the handler, every `ctx.db` call routes through
1159                // TxStore, which uses this same held connection — so no
1160                // re-locking, no deadlock, no interleaving.
1161                let conn_guard = self.runtime.lock_conn_pub().map_err(|e| FnCallError {
1162                    code: e.code,
1163                    message: e.message,
1164                })?;
1165
1166                if let Err(e) = conn_guard.execute("BEGIN", []) {
1167                    return Err(FnCallError {
1168                        code: "BEGIN_FAILED".into(),
1169                        message: format!("Failed to start transaction: {e}"),
1170                    });
1171                }
1172
1173                let tx_store = TxStore::new(&self.runtime, &conn_guard);
1174                let result = self.runner.call(
1175                    &tx_store,
1176                    fn_name,
1177                    def.fn_type,
1178                    args,
1179                    auth,
1180                    on_stream,
1181                    request,
1182                );
1183
1184                // Surface commit/rollback errors. A swallowed COMMIT failure
1185                // is the worst possible outcome: the caller sees success but
1186                // the data isn't durable. A swallowed ROLLBACK failure leaves
1187                // the connection in an unknown txn state for the next caller.
1188                let result = match result {
1189                    Ok(value) => match conn_guard.execute("COMMIT", []) {
1190                        Ok(_) => {
1191                            // Flush buffered change events NOW — after the
1192                            // commit durably lands but before we return
1193                            // success. Ordering matters: append to the log
1194                            // first (so /api/sync/pull callers that race
1195                            // with this broadcast see the row in the tail),
1196                            // then notify WS/SSE subscribers. `seq` on each
1197                            // pending event starts at 0; append assigns
1198                            // the real seq.
1199                            for ev in tx_store.take_pending() {
1200                                let seq = self.change_log.append(
1201                                    &ev.entity,
1202                                    &ev.row_id,
1203                                    ev.kind.clone(),
1204                                    ev.data.clone(),
1205                                );
1206                                let event = pylon_sync::ChangeEvent { seq, ..ev };
1207                                self.notifier.notify(&event);
1208                            }
1209                            Ok(value)
1210                        }
1211                        Err(commit_err) => {
1212                            // Best-effort cleanup. If ROLLBACK also fails the
1213                            // connection is in a bad state — at minimum the
1214                            // operator sees both failures in the log.
1215                            if let Err(rollback_err) = conn_guard.execute("ROLLBACK", []) {
1216                                tracing::warn!(
1217                                    "[functions] ROLLBACK after COMMIT failure also failed: {rollback_err}"
1218                                );
1219                            }
1220                            Err(FnCallError {
1221                                code: "COMMIT_FAILED".into(),
1222                                message: format!(
1223                                    "Function \"{fn_name}\" succeeded but COMMIT failed: {commit_err}"
1224                                ),
1225                            })
1226                        }
1227                    },
1228                    Err(handler_err) => {
1229                        if let Err(rollback_err) = conn_guard.execute("ROLLBACK", []) {
1230                            // Don't shadow the handler error — log the
1231                            // rollback failure separately.
1232                            tracing::warn!(
1233                                "[functions] ROLLBACK after handler error failed: {rollback_err}"
1234                            );
1235                        }
1236                        Err(handler_err)
1237                    }
1238                };
1239                // conn_guard drops here, releasing the lock.
1240                result
1241            }
1242            _ => self.runner.call(
1243                &*self.runtime,
1244                fn_name,
1245                def.fn_type,
1246                args,
1247                auth,
1248                on_stream,
1249                request,
1250            ),
1251        }
1252    }
1253
1254    fn recent_traces(&self, limit: usize) -> Vec<FnTrace> {
1255        self.runner.trace_log.recent(limit)
1256    }
1257
1258    fn check_rate_limit(&self, fn_name: &str, identity: &str) -> Result<(), u64> {
1259        let key = format!("{fn_name}::{identity}");
1260        self.fn_rate_limiter.check(&key)
1261    }
1262}
1263
1264/// Spawn the Bun function runtime if a `functions/` directory exists.
1265///
1266/// Returns `Some(FnOpsImpl)` if successful, `None` if no functions directory
1267/// or if Bun is not installed. Errors during startup print to stderr and
1268/// return `None` to keep the server running.
1269/// Resolve the path to the TypeScript function runtime script.
1270///
1271/// Searches in order:
1272/// 1. `$PYLON_FUNCTIONS_RUNTIME` environment variable (if set and file exists)
1273/// 2. `./node_modules/@pylon/functions/src/runtime.ts` (npm-installed)
1274/// 3. `./node_modules/@pylon/functions/dist/runtime.js` (built)
1275/// 4. `~/.pylon/runtime.ts` (user install)
1276/// 5. `packages/functions/src/runtime.ts` (dev monorepo)
1277///
1278/// Returns `None` if none exist.
1279pub fn find_functions_runtime() -> Option<String> {
1280    if let Ok(env_path) = std::env::var("PYLON_FUNCTIONS_RUNTIME") {
1281        if std::path::Path::new(&env_path).exists() {
1282            return Some(env_path);
1283        }
1284    }
1285
1286    // Walk parent directories like Node.js resolution does, so running
1287    // `pylon dev` from an example sub-directory still finds the
1288    // hoisted workspace package at the repo root. Without this, bun/npm
1289    // workspace users see "TypeScript function runtime is not configured"
1290    // and think the server is broken when it's just a CWD issue.
1291    let cwd = std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from("."));
1292    let home = std::env::var("HOME").unwrap_or_else(|_| ".".into());
1293    let relative_candidates = [
1294        "node_modules/@pylonsync/functions/src/runtime.ts",
1295        "node_modules/@pylonsync/functions/dist/runtime.js",
1296        // Legacy name — keeps upgrades from the pre-rename era working.
1297        "node_modules/@pylon/functions/src/runtime.ts",
1298        "node_modules/@pylon/functions/dist/runtime.js",
1299        // Monorepo dev: source tree at the workspace root.
1300        "packages/functions/src/runtime.ts",
1301    ];
1302
1303    let mut dir: Option<&std::path::Path> = Some(cwd.as_path());
1304    while let Some(current) = dir {
1305        for rel in &relative_candidates {
1306            let candidate = current.join(rel);
1307            if candidate.exists() {
1308                return candidate.to_str().map(|s| s.to_string());
1309            }
1310        }
1311        dir = current.parent();
1312    }
1313
1314    // Final fallback: user-wide install under ~/.pylon.
1315    let user_path = format!("{home}/.pylon/runtime.ts");
1316    if std::path::Path::new(&user_path).exists() {
1317        return Some(user_path);
1318    }
1319    None
1320}
1321
1322pub fn try_spawn_functions(
1323    runtime: Arc<Runtime>,
1324    job_queue: Arc<crate::jobs::JobQueue>,
1325    fn_rate_limiter: Arc<crate::rate_limit::RateLimiter>,
1326    change_log: Arc<pylon_sync::ChangeLog>,
1327    notifier: Arc<dyn pylon_router::ChangeNotifier>,
1328) -> Option<Arc<FnOpsImpl>> {
1329    let fn_dir = std::env::var("PYLON_FUNCTIONS_DIR").unwrap_or_else(|_| "functions".into());
1330    if !std::path::Path::new(&fn_dir).exists() {
1331        return None;
1332    }
1333
1334    let runtime_script = match find_functions_runtime() {
1335        Some(p) => p,
1336        None => {
1337            tracing::warn!(
1338                "[functions] No TypeScript runtime script found. TypeScript functions will be unavailable."
1339            );
1340            tracing::warn!(
1341                "[functions] Tried: $PYLON_FUNCTIONS_RUNTIME, node_modules/@pylon/functions/src/runtime.ts, ~/.pylon/runtime.ts, packages/functions/src/runtime.ts"
1342            );
1343            return None;
1344        }
1345    };
1346
1347    let runner = Arc::new(FnRunner::new(1000));
1348
1349    // start() now performs the handshake itself and returns the function
1350    // definitions, so there's no separate handshake step. On any failure the
1351    // child has already been killed.
1352    let defs = match runner.start("bun", &["run", &runtime_script, &fn_dir]) {
1353        Ok(defs) => defs,
1354        Err(e) => {
1355            tracing::warn!("[functions] Failed to start Bun runtime: {e}");
1356            tracing::warn!(
1357                "[functions] Install Bun from https://bun.sh — TypeScript functions will be unavailable."
1358            );
1359            return None;
1360        }
1361    };
1362
1363    // Hold a separate handle on the job queue for registering function
1364    // job handlers below, since the schedule-hook closure consumes its
1365    // own copy.
1366    let job_queue_for_handlers = Arc::clone(&job_queue);
1367
1368    // Wire scheduler requests from functions into the job queue. Use the
1369    // Result-returning variant so a persist failure surfaces as a TS-side
1370    // SCHEDULE_FAILED error instead of `{scheduled:true, id:""}`.
1371    runner.set_schedule_hook(Box::new(move |fn_name, args, delay_ms, run_at| {
1372        let delay_secs = match (delay_ms, run_at) {
1373            (Some(ms), _) => ms / 1000,
1374            (None, Some(ts)) => {
1375                let now = std::time::SystemTime::now()
1376                    .duration_since(std::time::UNIX_EPOCH)
1377                    .unwrap_or_default()
1378                    .as_millis() as u64;
1379                if ts > now {
1380                    (ts - now) / 1000
1381                } else {
1382                    0
1383                }
1384            }
1385            _ => 0,
1386        };
1387        job_queue.try_enqueue_with_options(
1388            fn_name,
1389            args,
1390            crate::jobs::Priority::Normal,
1391            delay_secs,
1392            3,
1393            "functions",
1394        )
1395    }));
1396
1397    let registry = Arc::new(FnRegistry::new());
1398    let count = defs.len();
1399    registry.replace_all(defs);
1400    tracing::warn!("[functions] Loaded {count} function(s) from {fn_dir}");
1401
1402    let ops = Arc::new(FnOpsImpl {
1403        runner,
1404        registry,
1405        runtime,
1406        fn_rate_limiter,
1407        change_log,
1408        notifier,
1409    });
1410
1411    install_nested_call_hook(&ops);
1412    register_function_job_handlers(&ops, &job_queue_for_handlers);
1413    spawn_runtime_supervisor(Arc::clone(&ops));
1414    Some(ops)
1415}
1416
1417/// Bridge scheduled function calls (via `ctx.scheduler.runAfter` or
1418/// `runAt`) to the function runner. Without this, the schedule hook
1419/// enqueues a job whose `name` is the function name — but no handler
1420/// is registered for it, so the worker fails with "No handler
1421/// registered" and the scheduled callback never runs.
1422///
1423/// Registers one handler per loaded function. Each handler invokes
1424/// `FnOpsImpl::call` with a system auth context (no user_id, not
1425/// admin) so the called function runs with the same privileges a
1426/// trusted-server-side caller would have. The args come from the
1427/// job payload, which the schedule hook copies verbatim from the
1428/// `runAfter(ms, fn, args)` invocation.
1429fn register_function_job_handlers(ops: &Arc<FnOpsImpl>, job_queue: &Arc<crate::jobs::JobQueue>) {
1430    use pylon_router::FnOps as _;
1431
1432    let fn_names: Vec<String> = ops.registry.list().into_iter().map(|d| d.name).collect();
1433
1434    for name in fn_names {
1435        let weak = Arc::downgrade(ops);
1436        let fn_name = name.clone();
1437        job_queue.register(
1438            &name,
1439            Arc::new(move |job: &crate::jobs::Job| {
1440                let ops = match weak.upgrade() {
1441                    Some(o) => o,
1442                    None => {
1443                        return crate::jobs::JobResult::Failure(
1444                            "RUNTIME_GONE: function ops dropped".into(),
1445                        )
1446                    }
1447                };
1448                let auth = FnAuth {
1449                    user_id: None,
1450                    is_admin: false,
1451                    tenant_id: None,
1452                };
1453                match ops.call(&fn_name, job.payload.clone(), auth, None, None) {
1454                    Ok(_) => crate::jobs::JobResult::Success,
1455                    Err(e) => crate::jobs::JobResult::Retry(format!("{}: {}", e.code, e.message)),
1456                }
1457            }),
1458        );
1459    }
1460}
1461
1462/// Route nested `RunFn` calls (action → query/mutation) through a
1463/// transactional wrapper so nested mutations get their own BEGIN/COMMIT.
1464///
1465/// Uses a `Weak<FnOpsImpl>` to avoid keeping the ops struct alive forever
1466/// through a cycle (hook stored on FnRunner ← held by FnOpsImpl). When the
1467/// ops struct is dropped the hook becomes a no-op error.
1468fn install_nested_call_hook(ops: &Arc<FnOpsImpl>) {
1469    use pylon_functions::protocol::{AuthInfo, FnType};
1470
1471    let weak = Arc::downgrade(ops);
1472    ops.runner.set_nested_call_hook(Box::new(
1473        move |fn_name: &str,
1474              fn_type: FnType,
1475              args: serde_json::Value,
1476              auth: AuthInfo|
1477              -> Result<serde_json::Value, (String, String)> {
1478            let ops = match weak.upgrade() {
1479                Some(o) => o,
1480                None => {
1481                    return Err((
1482                        "RUNTIME_GONE".into(),
1483                        "pylon runtime is shutting down".into(),
1484                    ))
1485                }
1486            };
1487
1488            match fn_type {
1489                FnType::Mutation => {
1490                    // Wrap the nested mutation in its own write-conn + BEGIN
1491                    // + COMMIT, matching the top-level mutation contract.
1492                    let conn_guard = ops
1493                        .runtime
1494                        .lock_conn_pub()
1495                        .map_err(|e| (e.code, e.message))?;
1496                    if let Err(e) = conn_guard.execute("BEGIN", []) {
1497                        return Err(("BEGIN_FAILED".into(), e.to_string()));
1498                    }
1499                    let tx_store = TxStore::new(&ops.runtime, &conn_guard);
1500                    // Re-enter protocol without acquiring io_lock — we're
1501                    // already inside the outer call_inner which holds it.
1502                    // Nested calls never get HTTP request metadata — that's
1503                    // only meaningful for the top-level webhook invocation.
1504                    let result = ops
1505                        .runner
1506                        .call_inner(&tx_store, fn_name, fn_type, args, auth, None, None);
1507                    match result {
1508                        Ok((value, _trace)) => {
1509                            if let Err(e) = conn_guard.execute("COMMIT", []) {
1510                                let _ = conn_guard.execute("ROLLBACK", []);
1511                                return Err(("COMMIT_FAILED".into(), e.to_string()));
1512                            }
1513                            // Flush change events after COMMIT so nested
1514                            // mutations (action → runMutation(...)) broadcast
1515                            // the same way top-level mutations do. Without
1516                            // this, every write an action emits is invisible
1517                            // to sync subscribers until the NEXT top-level
1518                            // mutation lands — streaming UIs stay empty.
1519                            for ev in tx_store.take_pending() {
1520                                let seq = ops.change_log.append(
1521                                    &ev.entity,
1522                                    &ev.row_id,
1523                                    ev.kind.clone(),
1524                                    ev.data.clone(),
1525                                );
1526                                let event = pylon_sync::ChangeEvent { seq, ..ev };
1527                                ops.notifier.notify(&event);
1528                            }
1529                            Ok(value)
1530                        }
1531                        Err(e) => {
1532                            let _ = conn_guard.execute("ROLLBACK", []);
1533                            Err((e.code, e.message))
1534                        }
1535                    }
1536                }
1537                _ => {
1538                    // Queries + actions: no transaction wrap needed. Just
1539                    // re-enter protocol via the same store (runtime).
1540                    // Nested: no HTTP request propagated (see above).
1541                    let result = ops.runner.call_inner(
1542                        &*ops.runtime,
1543                        fn_name,
1544                        fn_type,
1545                        args,
1546                        auth,
1547                        None,
1548                        None,
1549                    );
1550                    result.map(|(v, _)| v).map_err(|e| (e.code, e.message))
1551                }
1552            }
1553        },
1554    ));
1555}
1556
1557/// Background watchdog that restarts the Bun runtime if it dies (crashed,
1558/// killed by the call timeout path, OOM, etc.). Exponential backoff: 1s, 2s,
1559/// 4s, ... capped at 30s. Resets to 1s after a successful respawn.
1560///
1561/// We don't try to "give up" — if Bun keeps crashing the supervisor keeps
1562/// trying with the capped delay. The operator sees repeated WARN logs and
1563/// can investigate. Better than silently leaving functions disabled forever.
1564fn spawn_runtime_supervisor(ops: Arc<FnOpsImpl>) {
1565    use std::time::Duration;
1566
1567    std::thread::Builder::new()
1568        .name("pylon-fn-supervisor".into())
1569        .spawn(move || {
1570            let mut backoff = Duration::from_secs(1);
1571            let max_backoff = Duration::from_secs(30);
1572            loop {
1573                std::thread::sleep(Duration::from_secs(2));
1574                if ops.runner.is_alive() {
1575                    backoff = Duration::from_secs(1);
1576                    continue;
1577                }
1578                tracing::warn!(
1579                    "[functions] Bun runtime is not alive — respawning after {:?}",
1580                    backoff
1581                );
1582                std::thread::sleep(backoff);
1583                match ops.runner.respawn() {
1584                    Ok(defs) => {
1585                        let count = defs.len();
1586                        // Replace, not merge — deleted functions must stop
1587                        // being callable. register_all() alone leaves stale
1588                        // entries from the previous process generation.
1589                        ops.registry.replace_all(defs);
1590                        tracing::warn!("[functions] Respawned Bun runtime ({count} fn(s))");
1591                        backoff = Duration::from_secs(1);
1592                    }
1593                    Err(e) => {
1594                        tracing::warn!("[functions] Respawn failed: {e}");
1595                        // Persistent Bun-runtime failures are the kind of
1596                        // operator signal that belongs in error telemetry
1597                        // too. Include enough context to triage repeated
1598                        // events: current backoff (so operators can see
1599                        // how long failures have been compounding) and the
1600                        // component name.
1601                        let backoff_str = format!("{}", backoff.as_secs());
1602                        pylon_observability::report_error(&pylon_observability::ErrorEvent {
1603                            level: pylon_observability::ErrorLevel::Error,
1604                            code: "FN_RESPAWN_FAILED",
1605                            message: &e,
1606                            context: &[
1607                                ("component", "bun-runtime-supervisor"),
1608                                ("backoff_secs", &backoff_str),
1609                            ],
1610                        });
1611                        backoff = (backoff * 2).min(max_backoff);
1612                    }
1613                }
1614            }
1615        })
1616        .expect("failed to spawn function runtime supervisor");
1617}