1use pylon_http::{DataError, DataStore};
8
9use crate::Runtime;
10
11impl DataStore for Runtime {
16 fn manifest(&self) -> &pylon_kernel::AppManifest {
17 Runtime::manifest(self)
18 }
19
20 fn insert(&self, entity: &str, data: &serde_json::Value) -> Result<String, DataError> {
21 Runtime::insert(self, entity, data).map_err(into_data_error)
22 }
23
24 fn get_by_id(&self, entity: &str, id: &str) -> Result<Option<serde_json::Value>, DataError> {
25 Runtime::get_by_id(self, entity, id).map_err(into_data_error)
26 }
27
28 fn list(&self, entity: &str) -> Result<Vec<serde_json::Value>, DataError> {
29 Runtime::list(self, entity).map_err(into_data_error)
30 }
31
32 fn list_after(
33 &self,
34 entity: &str,
35 after: Option<&str>,
36 limit: usize,
37 ) -> Result<Vec<serde_json::Value>, DataError> {
38 Runtime::list_after(self, entity, after, limit).map_err(into_data_error)
39 }
40
41 fn update(&self, entity: &str, id: &str, data: &serde_json::Value) -> Result<bool, DataError> {
42 Runtime::update(self, entity, id, data).map_err(into_data_error)
43 }
44
45 fn delete(&self, entity: &str, id: &str) -> Result<bool, DataError> {
46 Runtime::delete(self, entity, id).map_err(into_data_error)
47 }
48
49 fn lookup(
50 &self,
51 entity: &str,
52 field: &str,
53 value: &str,
54 ) -> Result<Option<serde_json::Value>, DataError> {
55 Runtime::lookup(self, entity, field, value).map_err(into_data_error)
56 }
57
58 fn link(
59 &self,
60 entity: &str,
61 id: &str,
62 relation: &str,
63 target_id: &str,
64 ) -> Result<bool, DataError> {
65 Runtime::link(self, entity, id, relation, target_id).map_err(into_data_error)
66 }
67
68 fn unlink(&self, entity: &str, id: &str, relation: &str) -> Result<bool, DataError> {
69 Runtime::unlink(self, entity, id, relation).map_err(into_data_error)
70 }
71
72 fn query_filtered(
73 &self,
74 entity: &str,
75 filter: &serde_json::Value,
76 ) -> Result<Vec<serde_json::Value>, DataError> {
77 Runtime::query_filtered(self, entity, filter).map_err(into_data_error)
78 }
79
80 fn query_graph(&self, query: &serde_json::Value) -> Result<serde_json::Value, DataError> {
81 Runtime::query_graph(self, query).map_err(into_data_error)
82 }
83
84 fn aggregate(
85 &self,
86 entity: &str,
87 spec: &serde_json::Value,
88 ) -> Result<serde_json::Value, DataError> {
89 Runtime::aggregate(self, entity, spec).map_err(into_data_error)
90 }
91
92 fn transact(
93 &self,
94 ops: &[serde_json::Value],
95 ) -> Result<(bool, Vec<serde_json::Value>), DataError> {
96 let conn = self.lock_conn_pub().map_err(into_data_error)?;
97 let _ = conn.execute("BEGIN", []);
98 let mut results: Vec<serde_json::Value> = Vec::new();
99 let mut rollback = false;
100
101 for op in ops {
102 let op_type = op.get("op").and_then(|v| v.as_str()).unwrap_or("");
103 let entity = op.get("entity").and_then(|v| v.as_str()).unwrap_or("");
104
105 match op_type {
106 "insert" => {
107 let data = op.get("data").cloned().unwrap_or(serde_json::json!({}));
108 match self.insert_with_conn(&conn, entity, &data) {
109 Ok(id) => {
110 results.push(serde_json::json!({"op": "insert", "id": id}));
111 }
112 Err(e) => {
113 results.push(serde_json::json!({"op": "insert", "error": e.message}));
114 rollback = true;
115 break;
116 }
117 }
118 }
119 "update" => {
120 let id = op.get("id").and_then(|v| v.as_str()).unwrap_or("");
121 let data = op.get("data").cloned().unwrap_or(serde_json::json!({}));
122 match self.update_with_conn(&conn, entity, id, &data) {
123 Ok(_) => {
124 results.push(serde_json::json!({"op": "update", "id": id}));
125 }
126 Err(e) => {
127 results.push(serde_json::json!({"op": "update", "error": e.message}));
128 rollback = true;
129 break;
130 }
131 }
132 }
133 "delete" => {
134 let id = op.get("id").and_then(|v| v.as_str()).unwrap_or("");
135 match self.delete_with_conn(&conn, entity, id) {
136 Ok(_) => {
137 results.push(serde_json::json!({"op": "delete", "id": id}));
138 }
139 Err(e) => {
140 results.push(serde_json::json!({"op": "delete", "error": e.message}));
141 rollback = true;
142 break;
143 }
144 }
145 }
146 _ => {
147 results.push(serde_json::json!({"op": op_type, "error": "unknown operation"}));
148 }
149 }
150 }
151
152 if rollback {
153 let _ = conn.execute("ROLLBACK", []);
154 } else {
155 let _ = conn.execute("COMMIT", []);
156 }
157
158 Ok((!rollback, results))
159 }
160
161 fn search(
168 &self,
169 entity: &str,
170 query: &serde_json::Value,
171 ) -> Result<serde_json::Value, DataError> {
172 let ent = self
173 .manifest()
174 .entities
175 .iter()
176 .find(|e| e.name == entity)
177 .ok_or_else(|| DataError {
178 code: "ENTITY_NOT_FOUND".into(),
179 message: format!("Unknown entity: {entity}"),
180 })?;
181 let cfg = ent.search.as_ref().ok_or_else(|| DataError {
182 code: "SEARCH_NOT_CONFIGURED".into(),
183 message: format!("Entity {entity} has no `search:` config"),
184 })?;
185 let parsed: pylon_storage::search::SearchQuery = serde_json::from_value(query.clone())
186 .map_err(|e| DataError {
187 code: "INVALID_QUERY".into(),
188 message: format!("search query body: {e}"),
189 })?;
190 let conn = self.lock_conn_pub().map_err(into_data_error)?;
191 let result =
192 pylon_storage::search_query::run_search(&conn, entity, cfg, &parsed).map_err(|e| {
193 DataError {
194 code: e.code,
195 message: e.message,
196 }
197 })?;
198 serde_json::to_value(&result).map_err(|e| DataError {
199 code: "SEARCH_SERIALIZE_FAILED".into(),
200 message: e.to_string(),
201 })
202 }
203
204 fn crdt_snapshot(&self, entity: &str, row_id: &str) -> Result<Option<Vec<u8>>, DataError> {
209 let ent = self
210 .manifest()
211 .entities
212 .iter()
213 .find(|e| e.name == entity)
214 .ok_or_else(|| DataError {
215 code: "ENTITY_NOT_FOUND".into(),
216 message: format!("Unknown entity: {entity}"),
217 })?;
218 if !ent.crdt {
219 return Ok(None);
220 }
221 let conn = self.lock_conn_pub().map_err(into_data_error)?;
222 let snap = self
223 .crdt_store()
224 .snapshot(&conn, entity, row_id)
225 .map_err(|e| DataError {
226 code: "CRDT_SNAPSHOT_FAILED".into(),
227 message: format!("snapshot {entity}/{row_id}: {e}"),
228 })?;
229 Ok(Some(snap))
230 }
231
232 fn crdt_apply_update(
242 &self,
243 entity: &str,
244 row_id: &str,
245 update: &[u8],
246 ) -> Result<Vec<u8>, DataError> {
247 let ent = self
250 .manifest()
251 .entities
252 .iter()
253 .find(|e| e.name == entity)
254 .ok_or_else(|| DataError {
255 code: "ENTITY_NOT_FOUND".into(),
256 message: format!("Unknown entity: {entity}"),
257 })?
258 .clone();
259 if !ent.crdt {
260 return Err(DataError {
261 code: "NOT_SUPPORTED".into(),
262 message: format!("Entity {entity} has crdt: false; client push requires CRDT mode"),
263 });
264 }
265 let crdt_fields = self.crdt_fields_for(&ent).map_err(into_data_error)?;
266
267 let conn = self.lock_conn_pub().map_err(into_data_error)?;
268 crate::with_write_tx(&conn, || -> Result<Vec<u8>, crate::RuntimeError> {
269 let projected = self
273 .crdt_store()
274 .apply_remote_update(&conn, entity, row_id, &crdt_fields, update)
275 .map_err(|e| crate::RuntimeError {
276 code: "CRDT_APPLY_FAILED".into(),
277 message: format!("apply_remote_update {entity}/{row_id}: {e}"),
278 })?;
279
280 let projection = projected.as_object().ok_or_else(|| crate::RuntimeError {
284 code: "CRDT_PROJECTION_INVALID".into(),
285 message: "projected row was not a JSON object".into(),
286 })?;
287
288 let mut set_clauses = Vec::with_capacity(projection.len());
289 let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
290 let mut idx = 1;
291 for (key, val) in projection {
292 if key == "id" {
293 continue;
294 }
295 set_clauses.push(format!("{} = ?{idx}", crate::quote_ident(key.as_str())));
296 values.push(crate::json_to_sql(val));
297 idx += 1;
298 }
299 if set_clauses.is_empty() {
300 } else {
305 values.push(Box::new(row_id.to_string()));
306 let sql = format!(
307 "UPDATE {} SET {} WHERE \"id\" = ?{idx}",
308 crate::quote_ident(entity),
309 set_clauses.join(", ")
310 );
311 let params: Vec<&dyn rusqlite::types::ToSql> =
312 values.iter().map(|v| v.as_ref()).collect();
313 conn.execute(&sql, params.as_slice())
314 .map_err(|e| crate::RuntimeError {
315 code: "UPDATE_FAILED".into(),
316 message: format!("post-merge UPDATE {entity}/{row_id}: {e}"),
317 })?;
318 }
319
320 let snap = self
322 .crdt_store()
323 .snapshot(&conn, entity, row_id)
324 .map_err(|e| crate::RuntimeError {
325 code: "CRDT_SNAPSHOT_FAILED".into(),
326 message: format!("post-merge snapshot {entity}/{row_id}: {e}"),
327 })?;
328 Ok(snap)
329 })
330 .map_err(into_data_error)
331 }
332}
333
334fn into_data_error(e: crate::RuntimeError) -> DataError {
335 DataError {
336 code: e.code,
337 message: e.message,
338 }
339}
340
341use crate::sse::SseHub;
346use crate::ws::WsHub;
347use std::sync::Arc;
348
349pub struct WsSseNotifier {
351 pub ws: Arc<WsHub>,
352 pub sse: Arc<SseHub>,
353}
354
355impl pylon_router::ChangeNotifier for WsSseNotifier {
356 fn notify(&self, event: &pylon_sync::ChangeEvent) {
357 self.ws.broadcast(event);
358 self.sse.broadcast(event);
359 }
360
361 fn notify_presence(&self, json: &str) {
362 self.ws.broadcast_presence(json);
363 self.sse.broadcast_message(json);
364 }
365
366 fn notify_crdt(&self, entity: &str, row_id: &str, snapshot: &[u8]) {
402 let subscribers = self.ws.subscriptions().subscribers(entity, row_id);
403 if subscribers.is_empty() {
404 return;
405 }
406 match pylon_router::encode_crdt_frame(
407 pylon_router::CRDT_FRAME_SNAPSHOT,
408 entity,
409 row_id,
410 snapshot,
411 ) {
412 Ok(frame) => self.ws.broadcast_binary_to(&subscribers, frame),
413 Err(e) => {
414 tracing::warn!("[crdt] dropping binary frame for {entity}/{row_id}: {e}");
415 }
416 }
417 }
418}
419
420fn to_json<T: serde::Serialize>(val: T) -> serde_json::Value {
422 serde_json::to_value(val).unwrap_or(serde_json::json!({}))
423}
424
425fn to_json_array<T: serde::Serialize>(val: T) -> serde_json::Value {
427 serde_json::to_value(val).unwrap_or(serde_json::json!([]))
428}
429
430use crate::rooms::RoomManager;
435
436impl pylon_router::RoomOps for RoomManager {
437 fn join(
438 &self,
439 room: &str,
440 user_id: &str,
441 data: Option<serde_json::Value>,
442 ) -> Result<(serde_json::Value, serde_json::Value), DataError> {
443 RoomManager::join(self, room, user_id, data)
444 .map(|(snapshot, join_event)| (to_json(&snapshot), to_json(&join_event)))
445 .map_err(|e| DataError {
446 code: e.code,
447 message: e.message,
448 })
449 }
450
451 fn leave(&self, room: &str, user_id: &str) -> Option<serde_json::Value> {
452 RoomManager::leave(self, room, user_id).map(|event| to_json(&event))
453 }
454
455 fn set_presence(
456 &self,
457 room: &str,
458 user_id: &str,
459 data: serde_json::Value,
460 ) -> Option<serde_json::Value> {
461 RoomManager::set_presence(self, room, user_id, data).map(|event| to_json(&event))
462 }
463
464 fn broadcast(
465 &self,
466 room: &str,
467 sender: Option<&str>,
468 topic: &str,
469 data: serde_json::Value,
470 ) -> Option<serde_json::Value> {
471 RoomManager::broadcast(self, room, sender, topic, data).map(|event| to_json(&event))
472 }
473
474 fn list_rooms(&self) -> Vec<String> {
475 RoomManager::list_rooms(self)
476 }
477
478 fn room_size(&self, name: &str) -> usize {
479 RoomManager::room_size(self, name)
480 }
481
482 fn members(&self, name: &str) -> Vec<serde_json::Value> {
483 RoomManager::members(self, name)
484 .into_iter()
485 .map(|p| to_json(p))
486 .collect()
487 }
488}
489
490use pylon_plugin::builtin::cache::CachePlugin;
495
496pub struct PluginHooksAdapter(pub Arc<pylon_plugin::PluginRegistry>);
506
507impl pylon_router::PluginHookOps for PluginHooksAdapter {
508 fn before_insert(
509 &self,
510 entity: &str,
511 data: &mut serde_json::Value,
512 auth: &pylon_auth::AuthContext,
513 ) -> Result<(), (u16, String, String)> {
514 self.0
515 .run_before_insert(entity, data, auth)
516 .map_err(|e| (e.status, e.code, e.message))
517 }
518 fn after_insert(
519 &self,
520 entity: &str,
521 id: &str,
522 data: &serde_json::Value,
523 auth: &pylon_auth::AuthContext,
524 ) {
525 self.0.run_after_insert(entity, id, data, auth);
526 }
527 fn before_update(
528 &self,
529 entity: &str,
530 id: &str,
531 data: &mut serde_json::Value,
532 auth: &pylon_auth::AuthContext,
533 ) -> Result<(), (u16, String, String)> {
534 self.0
535 .run_before_update(entity, id, data, auth)
536 .map_err(|e| (e.status, e.code, e.message))
537 }
538 fn after_update(
539 &self,
540 entity: &str,
541 id: &str,
542 data: &serde_json::Value,
543 auth: &pylon_auth::AuthContext,
544 ) {
545 self.0.run_after_update(entity, id, data, auth);
546 }
547 fn before_delete(
548 &self,
549 entity: &str,
550 id: &str,
551 auth: &pylon_auth::AuthContext,
552 ) -> Result<(), (u16, String, String)> {
553 self.0
554 .run_before_delete(entity, id, auth)
555 .map_err(|e| (e.status, e.code, e.message))
556 }
557 fn after_delete(&self, entity: &str, id: &str, auth: &pylon_auth::AuthContext) {
558 self.0.run_after_delete(entity, id, auth);
559 }
560}
561
562pub struct CacheAdapter(pub Arc<CachePlugin>);
563
564impl pylon_router::CacheOps for CacheAdapter {
565 fn handle_command(&self, body: &str) -> (u16, String) {
566 crate::cache_handlers::handle_cache_command(&self.0, body)
567 }
568
569 fn handle_get(&self, key: &str) -> (u16, String) {
570 crate::cache_handlers::handle_cache_get(&self.0, key)
571 }
572
573 fn handle_delete(&self, key: &str) -> (u16, String) {
574 crate::cache_handlers::handle_cache_delete(&self.0, key)
575 }
576}
577
578use crate::pubsub::PubSubBroker;
583
584pub struct PubSubAdapter(pub Arc<PubSubBroker>);
585
586impl pylon_router::PubSubOps for PubSubAdapter {
587 fn handle_publish(&self, body: &str) -> (u16, String) {
588 crate::cache_handlers::handle_pubsub_publish(&self.0, body)
589 }
590
591 fn handle_channels(&self) -> (u16, String) {
592 crate::cache_handlers::handle_pubsub_channels(&self.0)
593 }
594
595 fn handle_history(&self, channel: &str, url: &str) -> (u16, String) {
596 crate::cache_handlers::handle_pubsub_history(&self.0, channel, url)
597 }
598}
599
600use crate::jobs::{JobQueue, Priority};
605
606impl pylon_router::JobOps for JobQueue {
607 fn enqueue(
608 &self,
609 name: &str,
610 payload: serde_json::Value,
611 priority: &str,
612 delay_secs: u64,
613 max_retries: u32,
614 queue: &str,
615 ) -> String {
616 let pri = Priority::from_str_loose(priority);
617 JobQueue::enqueue_with_options(self, name, payload, pri, delay_secs, max_retries, queue)
618 }
619
620 fn stats(&self) -> serde_json::Value {
621 to_json(JobQueue::stats(self))
622 }
623
624 fn dead_letters(&self) -> serde_json::Value {
625 to_json_array(JobQueue::dead_letters(self))
626 }
627
628 fn retry_dead(&self, id: &str) -> bool {
629 JobQueue::retry_dead(self, id)
630 }
631
632 fn list_jobs(
633 &self,
634 status: Option<&str>,
635 queue: Option<&str>,
636 limit: usize,
637 ) -> serde_json::Value {
638 to_json_array(JobQueue::list_jobs(self, status, queue, limit))
639 }
640
641 fn get_job(&self, id: &str) -> Option<serde_json::Value> {
642 JobQueue::get_job(self, id).map(|j| to_json(j))
643 }
644}
645
646use crate::scheduler::Scheduler;
651
652impl pylon_router::SchedulerOps for Scheduler {
653 fn list_tasks(&self) -> serde_json::Value {
654 to_json_array(Scheduler::list_tasks(self))
655 }
656
657 fn trigger(&self, name: &str) -> bool {
658 Scheduler::trigger(self, name)
659 }
660}
661
662use crate::workflows::WorkflowEngine;
667
668impl pylon_router::WorkflowOps for WorkflowEngine {
669 fn definitions(&self) -> serde_json::Value {
670 to_json_array(WorkflowEngine::definitions(self))
671 }
672
673 fn start(&self, name: &str, input: serde_json::Value) -> Result<String, String> {
674 WorkflowEngine::start(self, name, input)
675 }
676
677 fn list(&self, status_filter: Option<&str>) -> serde_json::Value {
678 let filter = status_filter.and_then(|s| match s {
680 "pending" => Some(crate::workflows::WorkflowStatus::Pending),
681 "running" => Some(crate::workflows::WorkflowStatus::Running),
682 "sleeping" => Some(crate::workflows::WorkflowStatus::Sleeping),
683 "waiting" => Some(crate::workflows::WorkflowStatus::WaitingForEvent),
684 "completed" => Some(crate::workflows::WorkflowStatus::Completed),
685 "failed" => Some(crate::workflows::WorkflowStatus::Failed),
686 "cancelled" => Some(crate::workflows::WorkflowStatus::Cancelled),
687 _ => None,
688 });
689 to_json_array(WorkflowEngine::list(self, filter.as_ref()))
690 }
691
692 fn get(&self, id: &str) -> Option<serde_json::Value> {
693 WorkflowEngine::get(self, id).map(|inst| to_json(inst))
694 }
695
696 fn advance(&self, id: &str) -> Result<String, String> {
697 WorkflowEngine::advance(self, id).map(|status| format!("{:?}", status))
698 }
699
700 fn send_event(&self, id: &str, event: &str, data: serde_json::Value) -> Result<(), String> {
701 WorkflowEngine::send_event(self, id, event, data)
702 }
703
704 fn cancel(&self, id: &str) -> Result<(), String> {
705 WorkflowEngine::cancel(self, id)
706 }
707}
708
709use pylon_storage::files::{FileStorage, LocalFileStorage};
714
715pub struct FileOpsAdapter {
717 pub storage: Arc<dyn FileStorage>,
718}
719
720impl FileOpsAdapter {
721 pub fn from_env() -> Self {
724 let dir = std::env::var("PYLON_FILES_DIR").unwrap_or_else(|_| "uploads".into());
725 let url_prefix =
726 std::env::var("PYLON_FILES_URL_PREFIX").unwrap_or_else(|_| "/api/files".into());
727 Self {
728 storage: Arc::new(LocalFileStorage::new(&dir, &url_prefix)),
729 }
730 }
731}
732
733impl pylon_router::FileOps for FileOpsAdapter {
734 fn upload(&self, _body: &str) -> (u16, String) {
735 (
741 400,
742 pylon_router::json_error(
743 "UPLOAD_NEEDS_BINARY",
744 "File uploads must use multipart/form-data or raw binary with X-Filename; this platform does not support string-body uploads",
745 ),
746 )
747 }
748
749 fn get_file(&self, id: &str) -> (u16, String) {
750 match self.storage.get(id) {
751 Ok(content) => (200, String::from_utf8_lossy(&content).into_owned()),
752 Err(e) if e.code == "NOT_FOUND" => {
753 (404, pylon_router::json_error("FILE_NOT_FOUND", &e.message))
754 }
755 Err(e) => (400, pylon_router::json_error(&e.code, &e.message)),
756 }
757 }
758}
759
760pub type LocalFileOps = FileOpsAdapter;
762
763impl LocalFileOps {
764 pub fn new_default() -> Self {
766 Self::from_env()
767 }
768}
769
770use pylon_auth::email::{ConsoleTransport, EmailTransport, HttpEmailTransport};
775
776pub struct EmailAdapter {
779 transport: Box<dyn EmailTransport>,
780}
781
782impl EmailAdapter {
783 pub fn from_env() -> Self {
784 if let Some(http) = HttpEmailTransport::from_env() {
785 Self {
786 transport: Box::new(http),
787 }
788 } else {
789 Self {
790 transport: Box::new(ConsoleTransport),
791 }
792 }
793 }
794}
795
796impl pylon_router::EmailSender for EmailAdapter {
797 fn send(&self, to: &str, subject: &str, body: &str) -> Result<(), String> {
798 self.transport
799 .send(to, subject, body)
800 .map_err(|e| e.message)
801 }
802}
803
804pub struct RuntimeOpenApiGenerator<'a> {
809 pub manifest: &'a pylon_kernel::AppManifest,
810}
811
812impl<'a> pylon_router::OpenApiGenerator for RuntimeOpenApiGenerator<'a> {
813 fn generate(&self, base_url: &str) -> String {
814 let spec = crate::openapi::generate_openapi(self.manifest, base_url);
815 serde_json::to_string(&spec).unwrap_or_else(|_| "{}".into())
816 }
817}
818
819pub struct ShardOpsAdapter {
826 pub registry: Arc<dyn pylon_realtime::DynShardRegistry>,
827}
828
829impl pylon_router::ShardOps for ShardOpsAdapter {
830 fn get_shard(&self, id: &str) -> Option<Arc<dyn pylon_realtime::DynShard>> {
831 self.registry.get(id)
832 }
833
834 fn list_shards(&self) -> Vec<String> {
835 self.registry.ids()
836 }
837
838 fn shard_count(&self) -> usize {
839 self.registry.len()
840 }
841}
842
843#[cfg(test)]
844mod find_runtime_tests {
845 use super::*;
846
847 #[test]
848 fn env_override_takes_precedence() {
849 let dir = std::env::temp_dir().join(format!("pylon_rt_{}", std::process::id()));
850 let _ = std::fs::create_dir_all(&dir);
851 let path = dir.join("custom_runtime.ts");
852 std::fs::write(&path, "// test").unwrap();
853
854 std::env::set_var("PYLON_FUNCTIONS_RUNTIME", path.to_str().unwrap());
855 let found = find_functions_runtime();
856 std::env::remove_var("PYLON_FUNCTIONS_RUNTIME");
857
858 assert_eq!(found.as_deref(), path.to_str());
859
860 let _ = std::fs::remove_dir_all(&dir);
861 }
862
863 #[test]
864 fn returns_none_when_env_path_missing() {
865 std::env::set_var(
866 "PYLON_FUNCTIONS_RUNTIME",
867 "/tmp/definitely-does-not-exist-42.ts",
868 );
869 let found = find_functions_runtime();
872 std::env::remove_var("PYLON_FUNCTIONS_RUNTIME");
873 assert_ne!(
874 found.as_deref(),
875 Some("/tmp/definitely-does-not-exist-42.ts")
876 );
877 }
878}
879
880pub struct TxStore<'a> {
919 runtime: &'a Runtime,
920 conn: &'a rusqlite::Connection,
921 pending: std::cell::RefCell<Vec<pylon_sync::ChangeEvent>>,
926}
927
928impl<'a> TxStore<'a> {
929 pub fn new(runtime: &'a Runtime, conn: &'a rusqlite::Connection) -> Self {
930 Self {
931 runtime,
932 conn,
933 pending: std::cell::RefCell::new(Vec::new()),
934 }
935 }
936
937 pub fn take_pending(&self) -> Vec<pylon_sync::ChangeEvent> {
942 std::mem::take(&mut *self.pending.borrow_mut())
943 }
944
945 fn record(
946 &self,
947 entity: &str,
948 row_id: &str,
949 kind: pylon_sync::ChangeKind,
950 data: Option<&serde_json::Value>,
951 ) {
952 self.pending.borrow_mut().push(pylon_sync::ChangeEvent {
953 seq: 0, entity: entity.to_string(),
955 row_id: row_id.to_string(),
956 kind,
957 data: data.cloned(),
958 timestamp: String::new(),
959 });
960 }
961}
962
963unsafe impl<'a> Sync for TxStore<'a> {}
965unsafe impl<'a> Send for TxStore<'a> {}
966
967impl<'a> DataStore for TxStore<'a> {
968 fn manifest(&self) -> &pylon_kernel::AppManifest {
969 self.runtime.manifest()
970 }
971
972 fn insert(&self, entity: &str, data: &serde_json::Value) -> Result<String, DataError> {
973 let id = self
974 .runtime
975 .insert_with_conn(self.conn, entity, data)
976 .map_err(into_data_error)?;
977 self.record(entity, &id, pylon_sync::ChangeKind::Insert, Some(data));
981 Ok(id)
982 }
983
984 fn get_by_id(&self, entity: &str, id: &str) -> Result<Option<serde_json::Value>, DataError> {
985 self.runtime
986 .get_by_id_with_conn(self.conn, entity, id)
987 .map_err(into_data_error)
988 }
989
990 fn list(&self, entity: &str) -> Result<Vec<serde_json::Value>, DataError> {
991 self.runtime
992 .list_with_conn(self.conn, entity)
993 .map_err(into_data_error)
994 }
995
996 fn list_after(
997 &self,
998 entity: &str,
999 after: Option<&str>,
1000 limit: usize,
1001 ) -> Result<Vec<serde_json::Value>, DataError> {
1002 self.runtime
1003 .list_after_with_conn(self.conn, entity, after, limit)
1004 .map_err(into_data_error)
1005 }
1006
1007 fn update(&self, entity: &str, id: &str, data: &serde_json::Value) -> Result<bool, DataError> {
1008 let updated = self
1009 .runtime
1010 .update_with_conn(self.conn, entity, id, data)
1011 .map_err(into_data_error)?;
1012 if updated {
1013 self.record(entity, id, pylon_sync::ChangeKind::Update, Some(data));
1014 }
1015 Ok(updated)
1016 }
1017
1018 fn delete(&self, entity: &str, id: &str) -> Result<bool, DataError> {
1019 let deleted = self
1020 .runtime
1021 .delete_with_conn(self.conn, entity, id)
1022 .map_err(into_data_error)?;
1023 if deleted {
1024 self.record(entity, id, pylon_sync::ChangeKind::Delete, None);
1025 }
1026 Ok(deleted)
1027 }
1028
1029 fn lookup(
1030 &self,
1031 entity: &str,
1032 field: &str,
1033 value: &str,
1034 ) -> Result<Option<serde_json::Value>, DataError> {
1035 self.runtime
1036 .lookup_with_conn(self.conn, entity, field, value)
1037 .map_err(into_data_error)
1038 }
1039
1040 fn link(
1041 &self,
1042 entity: &str,
1043 id: &str,
1044 relation: &str,
1045 target_id: &str,
1046 ) -> Result<bool, DataError> {
1047 self.runtime
1048 .link_with_conn(self.conn, entity, id, relation, target_id)
1049 .map_err(into_data_error)
1050 }
1051
1052 fn unlink(&self, entity: &str, id: &str, relation: &str) -> Result<bool, DataError> {
1053 self.runtime
1054 .unlink_with_conn(self.conn, entity, id, relation)
1055 .map_err(into_data_error)
1056 }
1057
1058 fn query_filtered(
1059 &self,
1060 entity: &str,
1061 filter: &serde_json::Value,
1062 ) -> Result<Vec<serde_json::Value>, DataError> {
1063 self.runtime
1064 .query_filtered_with_conn(self.conn, entity, filter)
1065 .map_err(into_data_error)
1066 }
1067
1068 fn query_graph(&self, query: &serde_json::Value) -> Result<serde_json::Value, DataError> {
1069 self.runtime
1070 .query_graph_with_conn(self.conn, query)
1071 .map_err(into_data_error)
1072 }
1073
1074 fn aggregate(
1075 &self,
1076 entity: &str,
1077 spec: &serde_json::Value,
1078 ) -> Result<serde_json::Value, DataError> {
1079 Runtime::aggregate(self.runtime, entity, spec).map_err(into_data_error)
1083 }
1084
1085 fn transact(
1086 &self,
1087 _ops: &[serde_json::Value],
1088 ) -> Result<(bool, Vec<serde_json::Value>), DataError> {
1089 Err(DataError {
1092 code: "NESTED_TRANSACTION".into(),
1093 message: "ctx.db.transact() is not allowed inside a mutation handler (the handler itself is transactional)".into(),
1094 })
1095 }
1096}
1097
1098use pylon_functions::protocol::{AuthInfo as FnAuth, FnType};
1103use pylon_functions::registry::{FnDef, FnRegistry};
1104use pylon_functions::runner::{FnCallError, FnRunner};
1105use pylon_functions::trace::FnTrace;
1106
1107pub struct FnOpsImpl {
1112 pub runner: Arc<FnRunner>,
1113 pub registry: Arc<FnRegistry>,
1114 pub runtime: Arc<Runtime>,
1115 pub fn_rate_limiter: Arc<crate::rate_limit::RateLimiter>,
1119 pub change_log: Arc<pylon_sync::ChangeLog>,
1125 pub notifier: Arc<dyn pylon_router::ChangeNotifier>,
1127}
1128
1129impl pylon_router::FnOps for FnOpsImpl {
1130 fn get_fn(&self, name: &str) -> Option<FnDef> {
1131 self.registry.get(name)
1132 }
1133
1134 fn list_fns(&self) -> Vec<FnDef> {
1135 self.registry.list()
1136 }
1137
1138 fn call(
1139 &self,
1140 fn_name: &str,
1141 args: serde_json::Value,
1142 auth: FnAuth,
1143 on_stream: Option<Box<dyn FnMut(&str) + Send>>,
1144 request: Option<pylon_functions::protocol::RequestInfo>,
1145 ) -> Result<(serde_json::Value, FnTrace), FnCallError> {
1146 let def = self.registry.get(fn_name).ok_or_else(|| FnCallError {
1147 code: "FN_NOT_FOUND".into(),
1148 message: format!("Function \"{fn_name}\" is not registered"),
1149 })?;
1150
1151 match def.fn_type {
1152 FnType::Mutation => {
1153 let conn_guard = self.runtime.lock_conn_pub().map_err(|e| FnCallError {
1162 code: e.code,
1163 message: e.message,
1164 })?;
1165
1166 if let Err(e) = conn_guard.execute("BEGIN", []) {
1167 return Err(FnCallError {
1168 code: "BEGIN_FAILED".into(),
1169 message: format!("Failed to start transaction: {e}"),
1170 });
1171 }
1172
1173 let tx_store = TxStore::new(&self.runtime, &conn_guard);
1174 let result = self.runner.call(
1175 &tx_store,
1176 fn_name,
1177 def.fn_type,
1178 args,
1179 auth,
1180 on_stream,
1181 request,
1182 );
1183
1184 let result = match result {
1189 Ok(value) => match conn_guard.execute("COMMIT", []) {
1190 Ok(_) => {
1191 for ev in tx_store.take_pending() {
1200 let seq = self.change_log.append(
1201 &ev.entity,
1202 &ev.row_id,
1203 ev.kind.clone(),
1204 ev.data.clone(),
1205 );
1206 let event = pylon_sync::ChangeEvent { seq, ..ev };
1207 self.notifier.notify(&event);
1208 }
1209 Ok(value)
1210 }
1211 Err(commit_err) => {
1212 if let Err(rollback_err) = conn_guard.execute("ROLLBACK", []) {
1216 tracing::warn!(
1217 "[functions] ROLLBACK after COMMIT failure also failed: {rollback_err}"
1218 );
1219 }
1220 Err(FnCallError {
1221 code: "COMMIT_FAILED".into(),
1222 message: format!(
1223 "Function \"{fn_name}\" succeeded but COMMIT failed: {commit_err}"
1224 ),
1225 })
1226 }
1227 },
1228 Err(handler_err) => {
1229 if let Err(rollback_err) = conn_guard.execute("ROLLBACK", []) {
1230 tracing::warn!(
1233 "[functions] ROLLBACK after handler error failed: {rollback_err}"
1234 );
1235 }
1236 Err(handler_err)
1237 }
1238 };
1239 result
1241 }
1242 _ => self.runner.call(
1243 &*self.runtime,
1244 fn_name,
1245 def.fn_type,
1246 args,
1247 auth,
1248 on_stream,
1249 request,
1250 ),
1251 }
1252 }
1253
1254 fn recent_traces(&self, limit: usize) -> Vec<FnTrace> {
1255 self.runner.trace_log.recent(limit)
1256 }
1257
1258 fn check_rate_limit(&self, fn_name: &str, identity: &str) -> Result<(), u64> {
1259 let key = format!("{fn_name}::{identity}");
1260 self.fn_rate_limiter.check(&key)
1261 }
1262}
1263
1264pub fn find_functions_runtime() -> Option<String> {
1280 if let Ok(env_path) = std::env::var("PYLON_FUNCTIONS_RUNTIME") {
1281 if std::path::Path::new(&env_path).exists() {
1282 return Some(env_path);
1283 }
1284 }
1285
1286 let cwd = std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from("."));
1292 let home = std::env::var("HOME").unwrap_or_else(|_| ".".into());
1293 let relative_candidates = [
1294 "node_modules/@pylonsync/functions/src/runtime.ts",
1295 "node_modules/@pylonsync/functions/dist/runtime.js",
1296 "node_modules/@pylon/functions/src/runtime.ts",
1298 "node_modules/@pylon/functions/dist/runtime.js",
1299 "packages/functions/src/runtime.ts",
1301 ];
1302
1303 let mut dir: Option<&std::path::Path> = Some(cwd.as_path());
1304 while let Some(current) = dir {
1305 for rel in &relative_candidates {
1306 let candidate = current.join(rel);
1307 if candidate.exists() {
1308 return candidate.to_str().map(|s| s.to_string());
1309 }
1310 }
1311 dir = current.parent();
1312 }
1313
1314 let user_path = format!("{home}/.pylon/runtime.ts");
1316 if std::path::Path::new(&user_path).exists() {
1317 return Some(user_path);
1318 }
1319 None
1320}
1321
1322pub fn try_spawn_functions(
1323 runtime: Arc<Runtime>,
1324 job_queue: Arc<crate::jobs::JobQueue>,
1325 fn_rate_limiter: Arc<crate::rate_limit::RateLimiter>,
1326 change_log: Arc<pylon_sync::ChangeLog>,
1327 notifier: Arc<dyn pylon_router::ChangeNotifier>,
1328) -> Option<Arc<FnOpsImpl>> {
1329 let fn_dir = std::env::var("PYLON_FUNCTIONS_DIR").unwrap_or_else(|_| "functions".into());
1330 if !std::path::Path::new(&fn_dir).exists() {
1331 return None;
1332 }
1333
1334 let runtime_script = match find_functions_runtime() {
1335 Some(p) => p,
1336 None => {
1337 tracing::warn!(
1338 "[functions] No TypeScript runtime script found. TypeScript functions will be unavailable."
1339 );
1340 tracing::warn!(
1341 "[functions] Tried: $PYLON_FUNCTIONS_RUNTIME, node_modules/@pylon/functions/src/runtime.ts, ~/.pylon/runtime.ts, packages/functions/src/runtime.ts"
1342 );
1343 return None;
1344 }
1345 };
1346
1347 let runner = Arc::new(FnRunner::new(1000));
1348
1349 let defs = match runner.start("bun", &["run", &runtime_script, &fn_dir]) {
1353 Ok(defs) => defs,
1354 Err(e) => {
1355 tracing::warn!("[functions] Failed to start Bun runtime: {e}");
1356 tracing::warn!(
1357 "[functions] Install Bun from https://bun.sh — TypeScript functions will be unavailable."
1358 );
1359 return None;
1360 }
1361 };
1362
1363 let job_queue_for_handlers = Arc::clone(&job_queue);
1367
1368 runner.set_schedule_hook(Box::new(move |fn_name, args, delay_ms, run_at| {
1372 let delay_secs = match (delay_ms, run_at) {
1373 (Some(ms), _) => ms / 1000,
1374 (None, Some(ts)) => {
1375 let now = std::time::SystemTime::now()
1376 .duration_since(std::time::UNIX_EPOCH)
1377 .unwrap_or_default()
1378 .as_millis() as u64;
1379 if ts > now {
1380 (ts - now) / 1000
1381 } else {
1382 0
1383 }
1384 }
1385 _ => 0,
1386 };
1387 job_queue.try_enqueue_with_options(
1388 fn_name,
1389 args,
1390 crate::jobs::Priority::Normal,
1391 delay_secs,
1392 3,
1393 "functions",
1394 )
1395 }));
1396
1397 let registry = Arc::new(FnRegistry::new());
1398 let count = defs.len();
1399 registry.replace_all(defs);
1400 tracing::warn!("[functions] Loaded {count} function(s) from {fn_dir}");
1401
1402 let ops = Arc::new(FnOpsImpl {
1403 runner,
1404 registry,
1405 runtime,
1406 fn_rate_limiter,
1407 change_log,
1408 notifier,
1409 });
1410
1411 install_nested_call_hook(&ops);
1412 register_function_job_handlers(&ops, &job_queue_for_handlers);
1413 spawn_runtime_supervisor(Arc::clone(&ops));
1414 Some(ops)
1415}
1416
1417fn register_function_job_handlers(ops: &Arc<FnOpsImpl>, job_queue: &Arc<crate::jobs::JobQueue>) {
1430 use pylon_router::FnOps as _;
1431
1432 let fn_names: Vec<String> = ops.registry.list().into_iter().map(|d| d.name).collect();
1433
1434 for name in fn_names {
1435 let weak = Arc::downgrade(ops);
1436 let fn_name = name.clone();
1437 job_queue.register(
1438 &name,
1439 Arc::new(move |job: &crate::jobs::Job| {
1440 let ops = match weak.upgrade() {
1441 Some(o) => o,
1442 None => {
1443 return crate::jobs::JobResult::Failure(
1444 "RUNTIME_GONE: function ops dropped".into(),
1445 )
1446 }
1447 };
1448 let auth = FnAuth {
1449 user_id: None,
1450 is_admin: false,
1451 tenant_id: None,
1452 };
1453 match ops.call(&fn_name, job.payload.clone(), auth, None, None) {
1454 Ok(_) => crate::jobs::JobResult::Success,
1455 Err(e) => crate::jobs::JobResult::Retry(format!("{}: {}", e.code, e.message)),
1456 }
1457 }),
1458 );
1459 }
1460}
1461
1462fn install_nested_call_hook(ops: &Arc<FnOpsImpl>) {
1469 use pylon_functions::protocol::{AuthInfo, FnType};
1470
1471 let weak = Arc::downgrade(ops);
1472 ops.runner.set_nested_call_hook(Box::new(
1473 move |fn_name: &str,
1474 fn_type: FnType,
1475 args: serde_json::Value,
1476 auth: AuthInfo|
1477 -> Result<serde_json::Value, (String, String)> {
1478 let ops = match weak.upgrade() {
1479 Some(o) => o,
1480 None => {
1481 return Err((
1482 "RUNTIME_GONE".into(),
1483 "pylon runtime is shutting down".into(),
1484 ))
1485 }
1486 };
1487
1488 match fn_type {
1489 FnType::Mutation => {
1490 let conn_guard = ops
1493 .runtime
1494 .lock_conn_pub()
1495 .map_err(|e| (e.code, e.message))?;
1496 if let Err(e) = conn_guard.execute("BEGIN", []) {
1497 return Err(("BEGIN_FAILED".into(), e.to_string()));
1498 }
1499 let tx_store = TxStore::new(&ops.runtime, &conn_guard);
1500 let result = ops
1505 .runner
1506 .call_inner(&tx_store, fn_name, fn_type, args, auth, None, None);
1507 match result {
1508 Ok((value, _trace)) => {
1509 if let Err(e) = conn_guard.execute("COMMIT", []) {
1510 let _ = conn_guard.execute("ROLLBACK", []);
1511 return Err(("COMMIT_FAILED".into(), e.to_string()));
1512 }
1513 for ev in tx_store.take_pending() {
1520 let seq = ops.change_log.append(
1521 &ev.entity,
1522 &ev.row_id,
1523 ev.kind.clone(),
1524 ev.data.clone(),
1525 );
1526 let event = pylon_sync::ChangeEvent { seq, ..ev };
1527 ops.notifier.notify(&event);
1528 }
1529 Ok(value)
1530 }
1531 Err(e) => {
1532 let _ = conn_guard.execute("ROLLBACK", []);
1533 Err((e.code, e.message))
1534 }
1535 }
1536 }
1537 _ => {
1538 let result = ops.runner.call_inner(
1542 &*ops.runtime,
1543 fn_name,
1544 fn_type,
1545 args,
1546 auth,
1547 None,
1548 None,
1549 );
1550 result.map(|(v, _)| v).map_err(|e| (e.code, e.message))
1551 }
1552 }
1553 },
1554 ));
1555}
1556
1557fn spawn_runtime_supervisor(ops: Arc<FnOpsImpl>) {
1565 use std::time::Duration;
1566
1567 std::thread::Builder::new()
1568 .name("pylon-fn-supervisor".into())
1569 .spawn(move || {
1570 let mut backoff = Duration::from_secs(1);
1571 let max_backoff = Duration::from_secs(30);
1572 loop {
1573 std::thread::sleep(Duration::from_secs(2));
1574 if ops.runner.is_alive() {
1575 backoff = Duration::from_secs(1);
1576 continue;
1577 }
1578 tracing::warn!(
1579 "[functions] Bun runtime is not alive — respawning after {:?}",
1580 backoff
1581 );
1582 std::thread::sleep(backoff);
1583 match ops.runner.respawn() {
1584 Ok(defs) => {
1585 let count = defs.len();
1586 ops.registry.replace_all(defs);
1590 tracing::warn!("[functions] Respawned Bun runtime ({count} fn(s))");
1591 backoff = Duration::from_secs(1);
1592 }
1593 Err(e) => {
1594 tracing::warn!("[functions] Respawn failed: {e}");
1595 let backoff_str = format!("{}", backoff.as_secs());
1602 pylon_observability::report_error(&pylon_observability::ErrorEvent {
1603 level: pylon_observability::ErrorLevel::Error,
1604 code: "FN_RESPAWN_FAILED",
1605 message: &e,
1606 context: &[
1607 ("component", "bun-runtime-supervisor"),
1608 ("backoff_secs", &backoff_str),
1609 ],
1610 });
1611 backoff = (backoff * 2).min(max_backoff);
1612 }
1613 }
1614 }
1615 })
1616 .expect("failed to spawn function runtime supervisor");
1617}