1use pylon_http::{DataError, DataStore};
8
9use crate::Runtime;
10
11#[derive(Debug, Clone)]
22pub(crate) struct PendingSchedule {
23 pub fn_name: String,
24 pub args: serde_json::Value,
25 pub delay_ms: Option<u64>,
26 pub run_at: Option<u64>,
27}
28
29thread_local! {
30 pub(crate) static MUTATION_SCHEDULE_BUFFER: std::cell::RefCell<Option<std::rc::Rc<std::cell::RefCell<Vec<PendingSchedule>>>>>
39 = const { std::cell::RefCell::new(None) };
40}
41
42pub(crate) struct ScheduleBufferGuard {
47 previous: Option<std::rc::Rc<std::cell::RefCell<Vec<PendingSchedule>>>>,
48 current: std::rc::Rc<std::cell::RefCell<Vec<PendingSchedule>>>,
49}
50
51impl ScheduleBufferGuard {
52 pub(crate) fn enter() -> Self {
53 let current = std::rc::Rc::new(std::cell::RefCell::new(Vec::new()));
54 let previous = MUTATION_SCHEDULE_BUFFER.with(|cell| {
55 let mut slot = cell.borrow_mut();
56 let old = slot.take();
57 *slot = Some(current.clone());
58 old
59 });
60 Self { previous, current }
61 }
62
63 pub(crate) fn take(&self) -> Vec<PendingSchedule> {
67 std::mem::take(&mut *self.current.borrow_mut())
68 }
69}
70
71impl Drop for ScheduleBufferGuard {
72 fn drop(&mut self) {
73 MUTATION_SCHEDULE_BUFFER.with(|cell| {
74 *cell.borrow_mut() = self.previous.take();
75 });
76 }
77}
78
79thread_local! {
84 static MUTATION_DEPTH: std::cell::Cell<u32> = const { std::cell::Cell::new(0) };
96}
97
98pub(crate) struct MutationDepthGuard;
103
104impl MutationDepthGuard {
105 pub(crate) fn enter() -> Self {
106 MUTATION_DEPTH.with(|d| d.set(d.get().saturating_add(1)));
107 Self
108 }
109}
110
111impl Drop for MutationDepthGuard {
112 fn drop(&mut self) {
113 MUTATION_DEPTH.with(|d| d.set(d.get().saturating_sub(1)));
114 }
115}
116
117pub(crate) fn in_mutation_tx() -> bool {
119 MUTATION_DEPTH.with(|d| d.get() > 0)
120}
121
122impl Runtime {
127 pub(crate) fn pg_transact_with_crdt(
133 &self,
134 pg: &crate::PgBackend,
135 ops: &[serde_json::Value],
136 ) -> Result<(bool, Vec<serde_json::Value>), DataError> {
137 use pylon_storage::pg_tx_store::{tx_delete, tx_insert, tx_update};
138
139 enum Op<'a> {
142 Insert {
143 entity: &'a str,
144 data: &'a serde_json::Value,
145 },
146 Update {
147 entity: &'a str,
148 id: &'a str,
149 data: &'a serde_json::Value,
150 },
151 Delete {
152 entity: &'a str,
153 id: &'a str,
154 },
155 }
156 let mut typed: Vec<Op<'_>> = Vec::with_capacity(ops.len());
157 for op in ops {
158 let op_type = op.get("op").and_then(|v| v.as_str()).unwrap_or("");
159 let entity = op
160 .get("entity")
161 .and_then(|v| v.as_str())
162 .ok_or_else(|| DataError {
163 code: "TX_INVALID_OP".into(),
164 message: "Each transact op must have an \"entity\" field".into(),
165 })?;
166 match op_type {
167 "insert" => {
168 let data = op.get("data").ok_or_else(|| DataError {
169 code: "TX_INVALID_OP".into(),
170 message: "insert op requires \"data\"".into(),
171 })?;
172 typed.push(Op::Insert { entity, data });
173 }
174 "update" => {
175 let id = op
176 .get("id")
177 .and_then(|v| v.as_str())
178 .ok_or_else(|| DataError {
179 code: "TX_INVALID_OP".into(),
180 message: "update op requires \"id\"".into(),
181 })?;
182 let data = op.get("data").ok_or_else(|| DataError {
183 code: "TX_INVALID_OP".into(),
184 message: "update op requires \"data\"".into(),
185 })?;
186 typed.push(Op::Update { entity, id, data });
187 }
188 "delete" => {
189 let id = op
190 .get("id")
191 .and_then(|v| v.as_str())
192 .ok_or_else(|| DataError {
193 code: "TX_INVALID_OP".into(),
194 message: "delete op requires \"id\"".into(),
195 })?;
196 typed.push(Op::Delete { entity, id });
197 }
198 other => {
199 return Err(DataError {
200 code: "TX_INVALID_OP".into(),
201 message: format!("unknown op \"{other}\""),
202 });
203 }
204 }
205 }
206
207 let mut crdt_touched: Vec<(String, String)> = Vec::new();
210
211 let manifest = self.manifest.clone();
212 let result = pg.store.with_transaction_raw(|tx| -> Result<Vec<serde_json::Value>, DataError> {
213 let mut json_results: Vec<serde_json::Value> = Vec::with_capacity(typed.len());
214 for op in &typed {
215 let result = match op {
216 Op::Insert { entity, data } => {
217 let ent = manifest.entities.iter().find(|e| e.name == *entity);
218 let id = if ent.map(|e| e.crdt).unwrap_or(false) {
219 let crdt_fields = self.crdt_fields_for(ent.unwrap()).map_err(|e| {
220 DataError { code: e.code, message: e.message }
221 })?;
222 let id = crate::generate_id();
223 pg.crdt
224 .apply_patch(tx, entity, &id, &crdt_fields, data)
225 .map_err(|e| DataError {
226 code: "CRDT_APPLY_FAILED".into(),
227 message: format!("crdt write {entity}/{id}: {e}"),
228 })?;
229 let mut row = (*data).clone();
230 if let Some(obj) = row.as_object_mut() {
231 obj.insert("id".into(), serde_json::Value::String(id.clone()));
232 }
233 tx_insert(tx, &manifest, entity, &row)?;
234 crdt_touched.push((entity.to_string(), id.clone()));
235 id
236 } else {
237 tx_insert(tx, &manifest, entity, data)?
238 };
239 serde_json::json!({ "op": "insert", "id": id })
240 }
241 Op::Update { entity, id, data } => {
242 let ent = manifest.entities.iter().find(|e| e.name == *entity);
243 let updated = if ent.map(|e| e.crdt).unwrap_or(false) {
244 let crdt_fields = self.crdt_fields_for(ent.unwrap()).map_err(|e| {
245 DataError { code: e.code, message: e.message }
246 })?;
247 pg.crdt
248 .apply_patch(tx, entity, id, &crdt_fields, data)
249 .map_err(|e| DataError {
250 code: "CRDT_APPLY_FAILED".into(),
251 message: format!("crdt update {entity}/{id}: {e}"),
252 })?;
253 let updated = tx_update(tx, &manifest, entity, id, data)?;
254 if !updated {
255 return Err(DataError {
256 code: "ENTITY_NOT_FOUND".into(),
257 message: format!(
258 "Update on {entity}/{id} found no row — refusing to commit \
259 a CRDT snapshot that would orphan."
260 ),
261 });
262 }
263 crdt_touched.push((entity.to_string(), id.to_string()));
264 updated
265 } else {
266 tx_update(tx, &manifest, entity, id, data)?
267 };
268 serde_json::json!({ "op": "update", "id": id, "updated": updated })
269 }
270 Op::Delete { entity, id } => {
271 let ent = manifest.entities.iter().find(|e| e.name == *entity);
272 let deleted = if ent.map(|e| e.crdt).unwrap_or(false) {
273 tx.execute(
274 "DELETE FROM _pylon_crdt_snapshots WHERE entity = $1 AND row_id = $2",
275 &[entity, id],
276 )
277 .map_err(|e| DataError {
278 code: "CRDT_SIDECAR_DELETE_FAILED".into(),
279 message: format!(
280 "delete pg crdt snapshot {entity}/{id}: {e}"
281 ),
282 })?;
283 let deleted = tx_delete(tx, &manifest, entity, id)?;
284 crdt_touched.push((entity.to_string(), id.to_string()));
285 deleted
286 } else {
287 tx_delete(tx, &manifest, entity, id)?
288 };
289 serde_json::json!({ "op": "delete", "id": id, "deleted": deleted })
290 }
291 };
292 json_results.push(result);
293 }
294 for (entity, id) in &crdt_touched {
296 pg.crdt.cache_after_commit(tx, entity, id);
297 }
298 Ok(json_results)
299 });
300
301 match result {
302 Ok(json_results) => Ok((true, json_results)),
303 Err(e) => {
304 for (entity, id) in &crdt_touched {
305 pg.crdt.evict(entity, id);
306 }
307 Err(e)
308 }
309 }
310 }
311}
312
313impl DataStore for Runtime {
318 fn manifest(&self) -> &pylon_kernel::AppManifest {
319 Runtime::manifest(self)
320 }
321
322 fn insert(&self, entity: &str, data: &serde_json::Value) -> Result<String, DataError> {
323 Runtime::insert(self, entity, data).map_err(into_data_error)
324 }
325
326 fn get_by_id(&self, entity: &str, id: &str) -> Result<Option<serde_json::Value>, DataError> {
327 Runtime::get_by_id(self, entity, id).map_err(into_data_error)
328 }
329
330 fn list(&self, entity: &str) -> Result<Vec<serde_json::Value>, DataError> {
331 Runtime::list(self, entity).map_err(into_data_error)
332 }
333
334 fn list_after(
335 &self,
336 entity: &str,
337 after: Option<&str>,
338 limit: usize,
339 ) -> Result<Vec<serde_json::Value>, DataError> {
340 Runtime::list_after(self, entity, after, limit).map_err(into_data_error)
341 }
342
343 fn update(&self, entity: &str, id: &str, data: &serde_json::Value) -> Result<bool, DataError> {
344 Runtime::update(self, entity, id, data).map_err(into_data_error)
345 }
346
347 fn delete(&self, entity: &str, id: &str) -> Result<bool, DataError> {
348 Runtime::delete(self, entity, id).map_err(into_data_error)
349 }
350
351 fn lookup(
352 &self,
353 entity: &str,
354 field: &str,
355 value: &str,
356 ) -> Result<Option<serde_json::Value>, DataError> {
357 Runtime::lookup(self, entity, field, value).map_err(into_data_error)
358 }
359
360 fn link(
361 &self,
362 entity: &str,
363 id: &str,
364 relation: &str,
365 target_id: &str,
366 ) -> Result<bool, DataError> {
367 Runtime::link(self, entity, id, relation, target_id).map_err(into_data_error)
368 }
369
370 fn unlink(&self, entity: &str, id: &str, relation: &str) -> Result<bool, DataError> {
371 Runtime::unlink(self, entity, id, relation).map_err(into_data_error)
372 }
373
374 fn query_filtered(
375 &self,
376 entity: &str,
377 filter: &serde_json::Value,
378 ) -> Result<Vec<serde_json::Value>, DataError> {
379 Runtime::query_filtered(self, entity, filter).map_err(into_data_error)
380 }
381
382 fn query_graph(&self, query: &serde_json::Value) -> Result<serde_json::Value, DataError> {
383 Runtime::query_graph(self, query).map_err(into_data_error)
384 }
385
386 fn aggregate(
387 &self,
388 entity: &str,
389 spec: &serde_json::Value,
390 ) -> Result<serde_json::Value, DataError> {
391 Runtime::aggregate(self, entity, spec).map_err(into_data_error)
392 }
393
394 fn transact(
395 &self,
396 ops: &[serde_json::Value],
397 ) -> Result<(bool, Vec<serde_json::Value>), DataError> {
398 if let Some(pg) = self.pg_backend() {
404 return self.pg_transact_with_crdt(pg, ops);
405 }
406 let conn = self.lock_conn_pub().map_err(into_data_error)?;
407 let _ = conn.execute("BEGIN", []);
408 let mut results: Vec<serde_json::Value> = Vec::new();
409 let mut rollback = false;
410
411 for op in ops {
412 let op_type = op.get("op").and_then(|v| v.as_str()).unwrap_or("");
413 let entity = op.get("entity").and_then(|v| v.as_str()).unwrap_or("");
414
415 match op_type {
416 "insert" => {
417 let data = op.get("data").cloned().unwrap_or(serde_json::json!({}));
418 match self.insert_with_conn(&conn, entity, &data) {
419 Ok(id) => {
420 results.push(serde_json::json!({"op": "insert", "id": id}));
421 }
422 Err(e) => {
423 results.push(serde_json::json!({"op": "insert", "error": e.message}));
424 rollback = true;
425 break;
426 }
427 }
428 }
429 "update" => {
430 let id = op.get("id").and_then(|v| v.as_str()).unwrap_or("");
431 let data = op.get("data").cloned().unwrap_or(serde_json::json!({}));
432 match self.update_with_conn(&conn, entity, id, &data) {
433 Ok(_) => {
434 results.push(serde_json::json!({"op": "update", "id": id}));
435 }
436 Err(e) => {
437 results.push(serde_json::json!({"op": "update", "error": e.message}));
438 rollback = true;
439 break;
440 }
441 }
442 }
443 "delete" => {
444 let id = op.get("id").and_then(|v| v.as_str()).unwrap_or("");
445 match self.delete_with_conn(&conn, entity, id) {
446 Ok(_) => {
447 results.push(serde_json::json!({"op": "delete", "id": id}));
448 }
449 Err(e) => {
450 results.push(serde_json::json!({"op": "delete", "error": e.message}));
451 rollback = true;
452 break;
453 }
454 }
455 }
456 _ => {
457 results.push(serde_json::json!({"op": op_type, "error": "unknown operation"}));
458 }
459 }
460 }
461
462 if rollback {
463 let _ = conn.execute("ROLLBACK", []);
464 } else {
465 let _ = conn.execute("COMMIT", []);
466 }
467
468 Ok((!rollback, results))
469 }
470
471 fn search(
478 &self,
479 entity: &str,
480 query: &serde_json::Value,
481 ) -> Result<serde_json::Value, DataError> {
482 let ent = self
483 .manifest()
484 .entities
485 .iter()
486 .find(|e| e.name == entity)
487 .ok_or_else(|| DataError {
488 code: "ENTITY_NOT_FOUND".into(),
489 message: format!("Unknown entity: {entity}"),
490 })?;
491 let cfg = ent.search.as_ref().ok_or_else(|| DataError {
492 code: "SEARCH_NOT_CONFIGURED".into(),
493 message: format!("Entity {entity} has no `search:` config"),
494 })?;
495 let parsed: pylon_storage::search::SearchQuery = serde_json::from_value(query.clone())
496 .map_err(|e| DataError {
497 code: "INVALID_QUERY".into(),
498 message: format!("search query body: {e}"),
499 })?;
500
501 if self.is_postgres() {
505 let pg = self.pg_data_store().ok_or_else(|| DataError {
506 code: "PG_DATASTORE_MISSING".into(),
507 message: "is_postgres=true but pg_data_store() returned None".into(),
508 })?;
509 let result = pg.run_search(entity, cfg, &parsed).map_err(|e| DataError {
510 code: e.code,
511 message: e.message,
512 })?;
513 return serde_json::to_value(&result).map_err(|e| DataError {
514 code: "SEARCH_SERIALIZE_FAILED".into(),
515 message: e.to_string(),
516 });
517 }
518
519 let conn = self.lock_conn_pub().map_err(into_data_error)?;
520 let result =
521 pylon_storage::search_query::run_search(&conn, entity, cfg, &parsed).map_err(|e| {
522 DataError {
523 code: e.code,
524 message: e.message,
525 }
526 })?;
527 serde_json::to_value(&result).map_err(|e| DataError {
528 code: "SEARCH_SERIALIZE_FAILED".into(),
529 message: e.to_string(),
530 })
531 }
532
533 fn crdt_snapshot(&self, entity: &str, row_id: &str) -> Result<Option<Vec<u8>>, DataError> {
538 if self.is_postgres() {
543 let ent = self
544 .manifest()
545 .entities
546 .iter()
547 .find(|e| e.name == entity)
548 .ok_or_else(|| DataError {
549 code: "ENTITY_NOT_FOUND".into(),
550 message: format!("Unknown entity: {entity}"),
551 })?;
552 if !ent.crdt {
553 return Ok(None);
554 }
555 let pg_backend = match self.pg_backend() {
556 Some(pg) => pg,
557 None => return Ok(None),
558 };
559 let snap = pg_backend
563 .store
564 .with_client(|client| -> Result<Vec<u8>, DataError> {
565 pg_backend
566 .crdt
567 .snapshot(client, entity, row_id)
568 .map_err(|e| DataError {
569 code: "CRDT_SNAPSHOT_FAILED".into(),
570 message: format!("snapshot {entity}/{row_id}: {e}"),
571 })
572 })?;
573 return Ok(Some(snap));
574 }
575 let ent = self
576 .manifest()
577 .entities
578 .iter()
579 .find(|e| e.name == entity)
580 .ok_or_else(|| DataError {
581 code: "ENTITY_NOT_FOUND".into(),
582 message: format!("Unknown entity: {entity}"),
583 })?;
584 if !ent.crdt {
585 return Ok(None);
586 }
587 let conn = self.lock_conn_pub().map_err(into_data_error)?;
588 let snap = self
589 .crdt_store()
590 .snapshot(&conn, entity, row_id)
591 .map_err(|e| DataError {
592 code: "CRDT_SNAPSHOT_FAILED".into(),
593 message: format!("snapshot {entity}/{row_id}: {e}"),
594 })?;
595 Ok(Some(snap))
596 }
597
598 fn crdt_apply_update(
608 &self,
609 entity: &str,
610 row_id: &str,
611 update: &[u8],
612 ) -> Result<Vec<u8>, DataError> {
613 if self.is_postgres() {
619 let ent = self
620 .manifest()
621 .entities
622 .iter()
623 .find(|e| e.name == entity)
624 .ok_or_else(|| DataError {
625 code: "ENTITY_NOT_FOUND".into(),
626 message: format!("Unknown entity: {entity}"),
627 })?
628 .clone();
629 if !ent.crdt {
630 return Err(DataError {
631 code: "NOT_SUPPORTED".into(),
632 message: format!(
633 "CRDT update sent for entity \"{entity}\" which has crdt: false"
634 ),
635 });
636 }
637 let pg_backend = self.pg_backend().ok_or_else(|| DataError {
638 code: "PG_BACKEND_MISSING".into(),
639 message: "is_postgres=true but pg_backend() returned None".into(),
640 })?;
641 let crdt_fields = self.crdt_fields_for(&ent).map_err(into_data_error)?;
642
643 let result =
651 pg_backend
652 .store
653 .with_transaction_raw(|tx| -> Result<Vec<u8>, DataError> {
654 let projected = pg_backend
655 .crdt
656 .apply_remote_update(tx, entity, row_id, &crdt_fields, update)
657 .map_err(|e| {
658 let code = match &e {
666 crate::loro_store::LoroStoreError::Decode(_) => {
667 "CRDT_DECODE_FAILED"
668 }
669 _ => "CRDT_APPLY_FAILED",
670 };
671 DataError {
672 code: code.into(),
673 message: format!("crdt apply update {entity}/{row_id}: {e}"),
674 }
675 })?;
676 let updated = pylon_storage::pg_tx_store::tx_update(
677 tx,
678 self.manifest(),
679 entity,
680 row_id,
681 &projected,
682 )?;
683 if !updated {
684 return Err(DataError {
689 code: "ENTITY_NOT_FOUND".into(),
690 message: format!(
691 "Peer-pushed CRDT update targets {entity}/{row_id} which has \
692 no materialized row — refusing to commit an orphan snapshot."
693 ),
694 });
695 }
696 let snap = crate::pg_loro_store::PgLoroStore::read_snapshot_via_conn(
702 tx, entity, row_id,
703 )
704 .map_err(|e| DataError {
705 code: "CRDT_SNAPSHOT_FAILED".into(),
706 message: format!("post-update snapshot {entity}/{row_id}: {e}"),
707 })?;
708 pg_backend.crdt.cache_after_commit(tx, entity, row_id);
711 Ok(snap)
712 });
713 if result.is_err() {
714 pg_backend.crdt.evict(entity, row_id);
719 }
720 return result;
721 }
722 let ent = self
725 .manifest()
726 .entities
727 .iter()
728 .find(|e| e.name == entity)
729 .ok_or_else(|| DataError {
730 code: "ENTITY_NOT_FOUND".into(),
731 message: format!("Unknown entity: {entity}"),
732 })?
733 .clone();
734 if !ent.crdt {
735 return Err(DataError {
736 code: "NOT_SUPPORTED".into(),
737 message: format!("Entity {entity} has crdt: false; client push requires CRDT mode"),
738 });
739 }
740 let crdt_fields = self.crdt_fields_for(&ent).map_err(into_data_error)?;
741
742 let conn = self.lock_conn_pub().map_err(into_data_error)?;
743 crate::with_write_tx(&conn, || -> Result<Vec<u8>, crate::RuntimeError> {
744 let projected = self
748 .crdt_store()
749 .apply_remote_update(&conn, entity, row_id, &crdt_fields, update)
750 .map_err(|e| crate::RuntimeError {
751 code: "CRDT_APPLY_FAILED".into(),
752 message: format!("apply_remote_update {entity}/{row_id}: {e}"),
753 })?;
754
755 let projection = projected.as_object().ok_or_else(|| crate::RuntimeError {
759 code: "CRDT_PROJECTION_INVALID".into(),
760 message: "projected row was not a JSON object".into(),
761 })?;
762
763 let mut set_clauses = Vec::with_capacity(projection.len());
764 let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
765 let mut idx = 1;
766 for (key, val) in projection {
767 if key == "id" {
768 continue;
769 }
770 set_clauses.push(format!("{} = ?{idx}", crate::quote_ident(key.as_str())));
771 values.push(crate::json_to_sql(val));
772 idx += 1;
773 }
774 if set_clauses.is_empty() {
775 } else {
780 values.push(Box::new(row_id.to_string()));
781 let sql = format!(
782 "UPDATE {} SET {} WHERE \"id\" = ?{idx}",
783 crate::quote_ident(entity),
784 set_clauses.join(", ")
785 );
786 let params: Vec<&dyn rusqlite::types::ToSql> =
787 values.iter().map(|v| v.as_ref()).collect();
788 conn.execute(&sql, params.as_slice())
789 .map_err(|e| crate::RuntimeError {
790 code: "UPDATE_FAILED".into(),
791 message: format!("post-merge UPDATE {entity}/{row_id}: {e}"),
792 })?;
793 }
794
795 let snap = self
797 .crdt_store()
798 .snapshot(&conn, entity, row_id)
799 .map_err(|e| crate::RuntimeError {
800 code: "CRDT_SNAPSHOT_FAILED".into(),
801 message: format!("post-merge snapshot {entity}/{row_id}: {e}"),
802 })?;
803 Ok(snap)
804 })
805 .map_err(into_data_error)
806 }
807}
808
809fn into_data_error(e: crate::RuntimeError) -> DataError {
810 DataError {
811 code: e.code,
812 message: e.message,
813 }
814}
815
816use crate::sse::SseHub;
821use crate::ws::WsHub;
822use std::sync::Arc;
823
824pub struct WsSseNotifier {
826 pub ws: Arc<WsHub>,
827 pub sse: Arc<SseHub>,
828}
829
830impl pylon_router::ChangeNotifier for WsSseNotifier {
831 fn notify(&self, event: &pylon_sync::ChangeEvent) {
832 self.ws.broadcast(event);
833 self.sse.broadcast(event);
834 }
835
836 fn notify_presence(&self, json: &str) {
837 self.ws.broadcast_presence(json);
838 self.sse.broadcast_message(json);
839 }
840
841 fn notify_crdt(&self, entity: &str, row_id: &str, snapshot: &[u8]) {
877 let subscribers = self.ws.subscriptions().subscribers(entity, row_id);
878 if subscribers.is_empty() {
879 return;
880 }
881 match pylon_router::encode_crdt_frame(
882 pylon_router::CRDT_FRAME_SNAPSHOT,
883 entity,
884 row_id,
885 snapshot,
886 ) {
887 Ok(frame) => self.ws.broadcast_binary_to(&subscribers, frame),
888 Err(e) => {
889 tracing::warn!("[crdt] dropping binary frame for {entity}/{row_id}: {e}");
890 }
891 }
892 }
893}
894
895fn to_json<T: serde::Serialize>(val: T) -> serde_json::Value {
897 serde_json::to_value(val).unwrap_or(serde_json::json!({}))
898}
899
900fn to_json_array<T: serde::Serialize>(val: T) -> serde_json::Value {
902 serde_json::to_value(val).unwrap_or(serde_json::json!([]))
903}
904
905use crate::rooms::RoomManager;
910
911impl pylon_router::RoomOps for RoomManager {
912 fn join(
913 &self,
914 room: &str,
915 user_id: &str,
916 data: Option<serde_json::Value>,
917 ) -> Result<(serde_json::Value, serde_json::Value), DataError> {
918 RoomManager::join(self, room, user_id, data)
919 .map(|(snapshot, join_event)| (to_json(&snapshot), to_json(&join_event)))
920 .map_err(|e| DataError {
921 code: e.code,
922 message: e.message,
923 })
924 }
925
926 fn leave(&self, room: &str, user_id: &str) -> Option<serde_json::Value> {
927 RoomManager::leave(self, room, user_id).map(|event| to_json(&event))
928 }
929
930 fn set_presence(
931 &self,
932 room: &str,
933 user_id: &str,
934 data: serde_json::Value,
935 ) -> Option<serde_json::Value> {
936 RoomManager::set_presence(self, room, user_id, data).map(|event| to_json(&event))
937 }
938
939 fn broadcast(
940 &self,
941 room: &str,
942 sender: Option<&str>,
943 topic: &str,
944 data: serde_json::Value,
945 ) -> Option<serde_json::Value> {
946 RoomManager::broadcast(self, room, sender, topic, data).map(|event| to_json(&event))
947 }
948
949 fn list_rooms(&self) -> Vec<String> {
950 RoomManager::list_rooms(self)
951 }
952
953 fn room_size(&self, name: &str) -> usize {
954 RoomManager::room_size(self, name)
955 }
956
957 fn members(&self, name: &str) -> Vec<serde_json::Value> {
958 RoomManager::members(self, name)
959 .into_iter()
960 .map(|p| to_json(p))
961 .collect()
962 }
963}
964
965use pylon_plugin::builtin::cache::CachePlugin;
970
971pub struct PluginHooksAdapter(pub Arc<pylon_plugin::PluginRegistry>);
981
982impl pylon_router::PluginHookOps for PluginHooksAdapter {
983 fn before_insert(
984 &self,
985 entity: &str,
986 data: &mut serde_json::Value,
987 auth: &pylon_auth::AuthContext,
988 ) -> Result<(), (u16, String, String)> {
989 self.0
990 .run_before_insert(entity, data, auth)
991 .map_err(|e| (e.status, e.code, e.message))
992 }
993 fn after_insert(
994 &self,
995 entity: &str,
996 id: &str,
997 data: &serde_json::Value,
998 auth: &pylon_auth::AuthContext,
999 ) {
1000 self.0.run_after_insert(entity, id, data, auth);
1001 }
1002 fn before_update(
1003 &self,
1004 entity: &str,
1005 id: &str,
1006 data: &mut serde_json::Value,
1007 auth: &pylon_auth::AuthContext,
1008 ) -> Result<(), (u16, String, String)> {
1009 self.0
1010 .run_before_update(entity, id, data, auth)
1011 .map_err(|e| (e.status, e.code, e.message))
1012 }
1013 fn after_update(
1014 &self,
1015 entity: &str,
1016 id: &str,
1017 data: &serde_json::Value,
1018 auth: &pylon_auth::AuthContext,
1019 ) {
1020 self.0.run_after_update(entity, id, data, auth);
1021 }
1022 fn before_delete(
1023 &self,
1024 entity: &str,
1025 id: &str,
1026 auth: &pylon_auth::AuthContext,
1027 ) -> Result<(), (u16, String, String)> {
1028 self.0
1029 .run_before_delete(entity, id, auth)
1030 .map_err(|e| (e.status, e.code, e.message))
1031 }
1032 fn after_delete(&self, entity: &str, id: &str, auth: &pylon_auth::AuthContext) {
1033 self.0.run_after_delete(entity, id, auth);
1034 }
1035}
1036
1037pub struct CacheAdapter(pub Arc<CachePlugin>);
1038
1039impl pylon_router::CacheOps for CacheAdapter {
1040 fn handle_command(&self, body: &str) -> (u16, String) {
1041 crate::cache_handlers::handle_cache_command(&self.0, body)
1042 }
1043
1044 fn handle_get(&self, key: &str) -> (u16, String) {
1045 crate::cache_handlers::handle_cache_get(&self.0, key)
1046 }
1047
1048 fn handle_delete(&self, key: &str) -> (u16, String) {
1049 crate::cache_handlers::handle_cache_delete(&self.0, key)
1050 }
1051}
1052
1053use crate::pubsub::PubSubBroker;
1058
1059pub struct PubSubAdapter(pub Arc<PubSubBroker>);
1060
1061impl pylon_router::PubSubOps for PubSubAdapter {
1062 fn handle_publish(&self, body: &str) -> (u16, String) {
1063 crate::cache_handlers::handle_pubsub_publish(&self.0, body)
1064 }
1065
1066 fn handle_channels(&self) -> (u16, String) {
1067 crate::cache_handlers::handle_pubsub_channels(&self.0)
1068 }
1069
1070 fn handle_history(&self, channel: &str, url: &str) -> (u16, String) {
1071 crate::cache_handlers::handle_pubsub_history(&self.0, channel, url)
1072 }
1073}
1074
1075use crate::jobs::{JobQueue, Priority};
1080
1081impl pylon_router::JobOps for JobQueue {
1082 fn enqueue(
1083 &self,
1084 name: &str,
1085 payload: serde_json::Value,
1086 priority: &str,
1087 delay_secs: u64,
1088 max_retries: u32,
1089 queue: &str,
1090 ) -> String {
1091 let pri = Priority::from_str_loose(priority);
1092 JobQueue::enqueue_with_options(self, name, payload, pri, delay_secs, max_retries, queue)
1093 }
1094
1095 fn stats(&self) -> serde_json::Value {
1096 to_json(JobQueue::stats(self))
1097 }
1098
1099 fn dead_letters(&self) -> serde_json::Value {
1100 to_json_array(JobQueue::dead_letters(self))
1101 }
1102
1103 fn retry_dead(&self, id: &str) -> bool {
1104 JobQueue::retry_dead(self, id)
1105 }
1106
1107 fn list_jobs(
1108 &self,
1109 status: Option<&str>,
1110 queue: Option<&str>,
1111 limit: usize,
1112 ) -> serde_json::Value {
1113 to_json_array(JobQueue::list_jobs(self, status, queue, limit))
1114 }
1115
1116 fn get_job(&self, id: &str) -> Option<serde_json::Value> {
1117 JobQueue::get_job(self, id).map(|j| to_json(j))
1118 }
1119}
1120
1121use crate::scheduler::Scheduler;
1126
1127impl pylon_router::SchedulerOps for Scheduler {
1128 fn list_tasks(&self) -> serde_json::Value {
1129 to_json_array(Scheduler::list_tasks(self))
1130 }
1131
1132 fn trigger(&self, name: &str) -> bool {
1133 Scheduler::trigger(self, name)
1134 }
1135}
1136
1137use crate::workflows::WorkflowEngine;
1142
1143impl pylon_router::WorkflowOps for WorkflowEngine {
1144 fn definitions(&self) -> serde_json::Value {
1145 to_json_array(WorkflowEngine::definitions(self))
1146 }
1147
1148 fn start(&self, name: &str, input: serde_json::Value) -> Result<String, String> {
1149 WorkflowEngine::start(self, name, input)
1150 }
1151
1152 fn list(&self, status_filter: Option<&str>) -> serde_json::Value {
1153 let filter = status_filter.and_then(|s| match s {
1155 "pending" => Some(crate::workflows::WorkflowStatus::Pending),
1156 "running" => Some(crate::workflows::WorkflowStatus::Running),
1157 "sleeping" => Some(crate::workflows::WorkflowStatus::Sleeping),
1158 "waiting" => Some(crate::workflows::WorkflowStatus::WaitingForEvent),
1159 "completed" => Some(crate::workflows::WorkflowStatus::Completed),
1160 "failed" => Some(crate::workflows::WorkflowStatus::Failed),
1161 "cancelled" => Some(crate::workflows::WorkflowStatus::Cancelled),
1162 _ => None,
1163 });
1164 to_json_array(WorkflowEngine::list(self, filter.as_ref()))
1165 }
1166
1167 fn get(&self, id: &str) -> Option<serde_json::Value> {
1168 WorkflowEngine::get(self, id).map(|inst| to_json(inst))
1169 }
1170
1171 fn advance(&self, id: &str) -> Result<String, String> {
1172 WorkflowEngine::advance(self, id).map(|status| format!("{:?}", status))
1173 }
1174
1175 fn send_event(&self, id: &str, event: &str, data: serde_json::Value) -> Result<(), String> {
1176 WorkflowEngine::send_event(self, id, event, data)
1177 }
1178
1179 fn cancel(&self, id: &str) -> Result<(), String> {
1180 WorkflowEngine::cancel(self, id)
1181 }
1182}
1183
1184use pylon_storage::files::{FileStorage, LocalFileStorage, Stack0FileStorage};
1189
1190pub struct FileOpsAdapter {
1192 pub storage: Arc<dyn FileStorage>,
1193}
1194
1195impl FileOpsAdapter {
1196 pub fn from_env() -> Self {
1203 let provider = std::env::var("PYLON_FILES_PROVIDER").unwrap_or_else(|_| "local".into());
1204 match provider.as_str() {
1205 "stack0" => match Stack0FileStorage::from_env() {
1206 Some(s) => Self {
1207 storage: Arc::new(s),
1208 },
1209 None => {
1210 tracing::warn!(
1211 "PYLON_FILES_PROVIDER=stack0 but PYLON_STACK0_API_KEY is not set; falling back to local storage"
1212 );
1213 Self::local_from_env()
1214 }
1215 },
1216 _ => Self::local_from_env(),
1217 }
1218 }
1219
1220 fn local_from_env() -> Self {
1221 let dir = std::env::var("PYLON_FILES_DIR").unwrap_or_else(|_| "uploads".into());
1222 let url_prefix =
1223 std::env::var("PYLON_FILES_URL_PREFIX").unwrap_or_else(|_| "/api/files".into());
1224 Self {
1225 storage: Arc::new(LocalFileStorage::new(&dir, &url_prefix)),
1226 }
1227 }
1228}
1229
1230impl pylon_router::FileOps for FileOpsAdapter {
1231 fn upload(&self, _body: &str) -> (u16, String) {
1232 (
1238 400,
1239 pylon_router::json_error(
1240 "UPLOAD_NEEDS_BINARY",
1241 "File uploads must use multipart/form-data or raw binary with X-Filename; this platform does not support string-body uploads",
1242 ),
1243 )
1244 }
1245
1246 fn get_file(&self, id: &str) -> (u16, String) {
1247 match self.storage.get(id) {
1248 Ok(content) => (200, String::from_utf8_lossy(&content).into_owned()),
1249 Err(e) if e.code == "NOT_FOUND" => {
1250 (404, pylon_router::json_error("FILE_NOT_FOUND", &e.message))
1251 }
1252 Err(e) => (400, pylon_router::json_error(&e.code, &e.message)),
1253 }
1254 }
1255}
1256
1257pub type LocalFileOps = FileOpsAdapter;
1259
1260impl LocalFileOps {
1261 pub fn new_default() -> Self {
1263 Self::from_env()
1264 }
1265}
1266
1267use pylon_auth::email::{ConsoleTransport, EmailTransport, HttpEmailTransport};
1272
1273pub struct EmailAdapter {
1276 transport: Box<dyn EmailTransport>,
1277}
1278
1279impl EmailAdapter {
1280 pub fn from_env() -> Self {
1281 if let Some(http) = HttpEmailTransport::from_env() {
1282 Self {
1283 transport: Box::new(http),
1284 }
1285 } else {
1286 Self {
1287 transport: Box::new(ConsoleTransport),
1288 }
1289 }
1290 }
1291}
1292
1293impl pylon_router::EmailSender for EmailAdapter {
1294 fn send(&self, to: &str, subject: &str, body: &str) -> Result<(), String> {
1295 self.transport
1296 .send(to, subject, body)
1297 .map_err(|e| e.message)
1298 }
1299}
1300
1301pub struct RuntimeOpenApiGenerator<'a> {
1306 pub manifest: &'a pylon_kernel::AppManifest,
1307}
1308
1309impl<'a> pylon_router::OpenApiGenerator for RuntimeOpenApiGenerator<'a> {
1310 fn generate(&self, base_url: &str) -> String {
1311 let spec = crate::openapi::generate_openapi(self.manifest, base_url);
1312 serde_json::to_string(&spec).unwrap_or_else(|_| "{}".into())
1313 }
1314}
1315
1316pub struct ShardOpsAdapter {
1323 pub registry: Arc<dyn pylon_realtime::DynShardRegistry>,
1324}
1325
1326impl pylon_router::ShardOps for ShardOpsAdapter {
1327 fn get_shard(&self, id: &str) -> Option<Arc<dyn pylon_realtime::DynShard>> {
1328 self.registry.get(id)
1329 }
1330
1331 fn list_shards(&self) -> Vec<String> {
1332 self.registry.ids()
1333 }
1334
1335 fn shard_count(&self) -> usize {
1336 self.registry.len()
1337 }
1338}
1339
1340#[cfg(test)]
1341mod find_runtime_tests {
1342 use super::*;
1343
1344 #[test]
1345 fn env_override_takes_precedence() {
1346 let dir = std::env::temp_dir().join(format!("pylon_rt_{}", std::process::id()));
1347 let _ = std::fs::create_dir_all(&dir);
1348 let path = dir.join("custom_runtime.ts");
1349 std::fs::write(&path, "// test").unwrap();
1350
1351 std::env::set_var("PYLON_FUNCTIONS_RUNTIME", path.to_str().unwrap());
1352 let found = find_functions_runtime();
1353 std::env::remove_var("PYLON_FUNCTIONS_RUNTIME");
1354
1355 assert_eq!(found.as_deref(), path.to_str());
1356
1357 let _ = std::fs::remove_dir_all(&dir);
1358 }
1359
1360 #[test]
1361 fn returns_none_when_env_path_missing() {
1362 std::env::set_var(
1363 "PYLON_FUNCTIONS_RUNTIME",
1364 "/tmp/definitely-does-not-exist-42.ts",
1365 );
1366 let found = find_functions_runtime();
1369 std::env::remove_var("PYLON_FUNCTIONS_RUNTIME");
1370 assert_ne!(
1371 found.as_deref(),
1372 Some("/tmp/definitely-does-not-exist-42.ts")
1373 );
1374 }
1375}
1376
1377pub struct TxStore<'a> {
1416 runtime: &'a Runtime,
1417 conn: &'a rusqlite::Connection,
1418 pending: std::cell::RefCell<Vec<pylon_sync::ChangeEvent>>,
1423}
1424
1425impl<'a> TxStore<'a> {
1426 pub fn new(runtime: &'a Runtime, conn: &'a rusqlite::Connection) -> Self {
1427 Self {
1428 runtime,
1429 conn,
1430 pending: std::cell::RefCell::new(Vec::new()),
1431 }
1432 }
1433
1434 pub fn take_pending(&self) -> Vec<pylon_sync::ChangeEvent> {
1439 std::mem::take(&mut *self.pending.borrow_mut())
1440 }
1441
1442 fn record(
1443 &self,
1444 entity: &str,
1445 row_id: &str,
1446 kind: pylon_sync::ChangeKind,
1447 data: Option<&serde_json::Value>,
1448 ) {
1449 self.pending.borrow_mut().push(pylon_sync::ChangeEvent {
1450 seq: 0, entity: entity.to_string(),
1452 row_id: row_id.to_string(),
1453 kind,
1454 data: data.cloned(),
1455 timestamp: String::new(),
1456 });
1457 }
1458}
1459
1460unsafe impl<'a> Sync for TxStore<'a> {}
1462unsafe impl<'a> Send for TxStore<'a> {}
1463
1464impl<'a> DataStore for TxStore<'a> {
1465 fn manifest(&self) -> &pylon_kernel::AppManifest {
1466 self.runtime.manifest()
1467 }
1468
1469 fn insert(&self, entity: &str, data: &serde_json::Value) -> Result<String, DataError> {
1470 let id = self
1471 .runtime
1472 .insert_with_conn(self.conn, entity, data)
1473 .map_err(into_data_error)?;
1474 self.record(entity, &id, pylon_sync::ChangeKind::Insert, Some(data));
1478 Ok(id)
1479 }
1480
1481 fn get_by_id(&self, entity: &str, id: &str) -> Result<Option<serde_json::Value>, DataError> {
1482 self.runtime
1483 .get_by_id_with_conn(self.conn, entity, id)
1484 .map_err(into_data_error)
1485 }
1486
1487 fn list(&self, entity: &str) -> Result<Vec<serde_json::Value>, DataError> {
1488 self.runtime
1489 .list_with_conn(self.conn, entity)
1490 .map_err(into_data_error)
1491 }
1492
1493 fn list_after(
1494 &self,
1495 entity: &str,
1496 after: Option<&str>,
1497 limit: usize,
1498 ) -> Result<Vec<serde_json::Value>, DataError> {
1499 self.runtime
1500 .list_after_with_conn(self.conn, entity, after, limit)
1501 .map_err(into_data_error)
1502 }
1503
1504 fn update(&self, entity: &str, id: &str, data: &serde_json::Value) -> Result<bool, DataError> {
1505 let updated = self
1506 .runtime
1507 .update_with_conn(self.conn, entity, id, data)
1508 .map_err(into_data_error)?;
1509 if updated {
1510 self.record(entity, id, pylon_sync::ChangeKind::Update, Some(data));
1511 }
1512 Ok(updated)
1513 }
1514
1515 fn delete(&self, entity: &str, id: &str) -> Result<bool, DataError> {
1516 let deleted = self
1517 .runtime
1518 .delete_with_conn(self.conn, entity, id)
1519 .map_err(into_data_error)?;
1520 if deleted {
1521 self.record(entity, id, pylon_sync::ChangeKind::Delete, None);
1522 }
1523 Ok(deleted)
1524 }
1525
1526 fn lookup(
1527 &self,
1528 entity: &str,
1529 field: &str,
1530 value: &str,
1531 ) -> Result<Option<serde_json::Value>, DataError> {
1532 self.runtime
1533 .lookup_with_conn(self.conn, entity, field, value)
1534 .map_err(into_data_error)
1535 }
1536
1537 fn link(
1538 &self,
1539 entity: &str,
1540 id: &str,
1541 relation: &str,
1542 target_id: &str,
1543 ) -> Result<bool, DataError> {
1544 self.runtime
1545 .link_with_conn(self.conn, entity, id, relation, target_id)
1546 .map_err(into_data_error)
1547 }
1548
1549 fn unlink(&self, entity: &str, id: &str, relation: &str) -> Result<bool, DataError> {
1550 self.runtime
1551 .unlink_with_conn(self.conn, entity, id, relation)
1552 .map_err(into_data_error)
1553 }
1554
1555 fn query_filtered(
1556 &self,
1557 entity: &str,
1558 filter: &serde_json::Value,
1559 ) -> Result<Vec<serde_json::Value>, DataError> {
1560 self.runtime
1561 .query_filtered_with_conn(self.conn, entity, filter)
1562 .map_err(into_data_error)
1563 }
1564
1565 fn query_graph(&self, query: &serde_json::Value) -> Result<serde_json::Value, DataError> {
1566 self.runtime
1567 .query_graph_with_conn(self.conn, query)
1568 .map_err(into_data_error)
1569 }
1570
1571 fn aggregate(
1572 &self,
1573 entity: &str,
1574 spec: &serde_json::Value,
1575 ) -> Result<serde_json::Value, DataError> {
1576 Runtime::aggregate(self.runtime, entity, spec).map_err(into_data_error)
1580 }
1581
1582 fn transact(
1583 &self,
1584 _ops: &[serde_json::Value],
1585 ) -> Result<(bool, Vec<serde_json::Value>), DataError> {
1586 Err(DataError {
1589 code: "NESTED_TRANSACTION".into(),
1590 message: "ctx.db.transact() is not allowed inside a mutation handler (the handler itself is transactional)".into(),
1591 })
1592 }
1593
1594 fn search(
1595 &self,
1596 entity: &str,
1597 query: &serde_json::Value,
1598 ) -> Result<serde_json::Value, DataError> {
1599 <Runtime as DataStore>::search(self.runtime, entity, query)
1604 }
1605}
1606
1607struct PgBufferedTxStore<'a> {
1618 inner: &'a dyn DataStore,
1619 pending: std::sync::Mutex<Vec<pylon_sync::ChangeEvent>>,
1620}
1621
1622impl<'a> PgBufferedTxStore<'a> {
1623 fn new(inner: &'a dyn DataStore) -> Self {
1624 Self {
1625 inner,
1626 pending: std::sync::Mutex::new(Vec::new()),
1627 }
1628 }
1629
1630 fn record(
1631 &self,
1632 entity: &str,
1633 row_id: &str,
1634 kind: pylon_sync::ChangeKind,
1635 data: Option<&serde_json::Value>,
1636 ) {
1637 if let Ok(mut p) = self.pending.lock() {
1638 p.push(pylon_sync::ChangeEvent {
1639 seq: 0,
1640 entity: entity.to_string(),
1641 row_id: row_id.to_string(),
1642 kind,
1643 data: data.cloned(),
1644 timestamp: String::new(),
1645 });
1646 }
1647 }
1648
1649 fn take_pending(self) -> Vec<pylon_sync::ChangeEvent> {
1650 self.pending.into_inner().unwrap_or_default()
1651 }
1652}
1653
1654impl<'a> DataStore for PgBufferedTxStore<'a> {
1655 fn manifest(&self) -> &pylon_kernel::AppManifest {
1656 self.inner.manifest()
1657 }
1658
1659 fn insert(&self, entity: &str, data: &serde_json::Value) -> Result<String, DataError> {
1660 let id = self.inner.insert(entity, data)?;
1661 self.record(entity, &id, pylon_sync::ChangeKind::Insert, Some(data));
1662 Ok(id)
1663 }
1664
1665 fn get_by_id(&self, entity: &str, id: &str) -> Result<Option<serde_json::Value>, DataError> {
1666 self.inner.get_by_id(entity, id)
1667 }
1668
1669 fn list(&self, entity: &str) -> Result<Vec<serde_json::Value>, DataError> {
1670 self.inner.list(entity)
1671 }
1672
1673 fn list_after(
1674 &self,
1675 entity: &str,
1676 after: Option<&str>,
1677 limit: usize,
1678 ) -> Result<Vec<serde_json::Value>, DataError> {
1679 self.inner.list_after(entity, after, limit)
1680 }
1681
1682 fn update(&self, entity: &str, id: &str, data: &serde_json::Value) -> Result<bool, DataError> {
1683 let updated = self.inner.update(entity, id, data)?;
1684 if updated {
1685 self.record(entity, id, pylon_sync::ChangeKind::Update, Some(data));
1686 }
1687 Ok(updated)
1688 }
1689
1690 fn delete(&self, entity: &str, id: &str) -> Result<bool, DataError> {
1691 let deleted = self.inner.delete(entity, id)?;
1692 if deleted {
1693 self.record(entity, id, pylon_sync::ChangeKind::Delete, None);
1694 }
1695 Ok(deleted)
1696 }
1697
1698 fn lookup(
1699 &self,
1700 entity: &str,
1701 field: &str,
1702 value: &str,
1703 ) -> Result<Option<serde_json::Value>, DataError> {
1704 self.inner.lookup(entity, field, value)
1705 }
1706
1707 fn link(
1708 &self,
1709 entity: &str,
1710 id: &str,
1711 relation: &str,
1712 target_id: &str,
1713 ) -> Result<bool, DataError> {
1714 let linked = self.inner.link(entity, id, relation, target_id)?;
1715 if linked {
1716 let data = serde_json::json!({ relation: target_id });
1720 self.record(entity, id, pylon_sync::ChangeKind::Update, Some(&data));
1721 }
1722 Ok(linked)
1723 }
1724
1725 fn unlink(&self, entity: &str, id: &str, relation: &str) -> Result<bool, DataError> {
1726 let unlinked = self.inner.unlink(entity, id, relation)?;
1727 if unlinked {
1728 let data = serde_json::json!({ relation: serde_json::Value::Null });
1729 self.record(entity, id, pylon_sync::ChangeKind::Update, Some(&data));
1730 }
1731 Ok(unlinked)
1732 }
1733
1734 fn query_filtered(
1735 &self,
1736 entity: &str,
1737 filter: &serde_json::Value,
1738 ) -> Result<Vec<serde_json::Value>, DataError> {
1739 self.inner.query_filtered(entity, filter)
1740 }
1741
1742 fn query_graph(&self, query: &serde_json::Value) -> Result<serde_json::Value, DataError> {
1743 self.inner.query_graph(query)
1744 }
1745
1746 fn aggregate(
1747 &self,
1748 entity: &str,
1749 spec: &serde_json::Value,
1750 ) -> Result<serde_json::Value, DataError> {
1751 self.inner.aggregate(entity, spec)
1752 }
1753
1754 fn transact(
1755 &self,
1756 ops: &[serde_json::Value],
1757 ) -> Result<(bool, Vec<serde_json::Value>), DataError> {
1758 self.inner.transact(ops)
1763 }
1764
1765 fn search(
1766 &self,
1767 entity: &str,
1768 query: &serde_json::Value,
1769 ) -> Result<serde_json::Value, DataError> {
1770 self.inner.search(entity, query)
1776 }
1777}
1778
1779use pylon_functions::protocol::{AuthInfo as FnAuth, FnType};
1784use pylon_functions::registry::{FnDef, FnRegistry};
1785use pylon_functions::runner::{FnCallError, FnRunner};
1786use pylon_functions::trace::FnTrace;
1787
1788pub struct FnOpsImpl {
1793 pub runner: Arc<FnRunner>,
1794 pub registry: Arc<FnRegistry>,
1795 pub runtime: Arc<Runtime>,
1796 pub fn_rate_limiter: Arc<crate::rate_limit::RateLimiter>,
1800 pub change_log: Arc<pylon_sync::ChangeLog>,
1806 pub notifier: Arc<dyn pylon_router::ChangeNotifier>,
1808 pub job_queue: Arc<crate::jobs::JobQueue>,
1813}
1814
1815impl FnOpsImpl {
1816 fn flush_pending_schedules(&self, pending: Vec<PendingSchedule>) {
1820 for sched in pending {
1821 let delay_secs = match (sched.delay_ms, sched.run_at) {
1822 (Some(ms), _) => ms / 1000,
1823 (None, Some(ts)) => {
1824 let now = std::time::SystemTime::now()
1825 .duration_since(std::time::UNIX_EPOCH)
1826 .unwrap_or_default()
1827 .as_millis() as u64;
1828 if ts > now {
1829 (ts - now) / 1000
1830 } else {
1831 0
1832 }
1833 }
1834 _ => 0,
1835 };
1836 if let Err(e) = self.job_queue.try_enqueue_with_options(
1837 &sched.fn_name,
1838 sched.args,
1839 crate::jobs::Priority::Normal,
1840 delay_secs,
1841 3,
1842 "functions",
1843 ) {
1844 tracing::warn!(
1848 "[functions] post-COMMIT enqueue failed for \"{}\": {e}",
1849 sched.fn_name
1850 );
1851 }
1852 }
1853 }
1854}
1855
1856impl pylon_router::FnOps for FnOpsImpl {
1857 fn get_fn(&self, name: &str) -> Option<FnDef> {
1858 self.registry.get(name)
1859 }
1860
1861 fn list_fns(&self) -> Vec<FnDef> {
1862 self.registry.list()
1863 }
1864
1865 fn call(
1866 &self,
1867 fn_name: &str,
1868 args: serde_json::Value,
1869 auth: FnAuth,
1870 on_stream: Option<Box<dyn FnMut(&str) + Send>>,
1871 request: Option<pylon_functions::protocol::RequestInfo>,
1872 ) -> Result<(serde_json::Value, FnTrace), FnCallError> {
1873 let def = self.registry.get(fn_name).ok_or_else(|| FnCallError {
1874 code: "FN_NOT_FOUND".into(),
1875 message: format!("Function \"{fn_name}\" is not registered"),
1876 })?;
1877
1878 match def.fn_type {
1879 FnType::Mutation => {
1880 if self.runtime.is_postgres() {
1887 let pg_backend = self.runtime.pg_backend().ok_or_else(|| FnCallError {
1888 code: "PG_BACKEND_MISSING".into(),
1889 message:
1890 "Postgres backend reported is_postgres=true but pg_backend() returned None"
1891 .into(),
1892 })?;
1893
1894 let runner = self.runner.clone();
1899 let fn_type = def.fn_type;
1900 let fn_name_owned = fn_name.to_string();
1901
1902 let sched_guard = ScheduleBufferGuard::enter();
1906 let _depth_guard = MutationDepthGuard::enter();
1910
1911 let crdt_hook: std::sync::Arc<dyn pylon_storage::pg_tx_store::PgCrdtHook> =
1917 std::sync::Arc::new(crate::pg_loro_store::PgCrdtHookImpl {
1918 crdt: std::sync::Arc::clone(&pg_backend.crdt),
1919 manifest: std::sync::Arc::new(self.runtime.manifest().clone()),
1920 });
1921
1922 let pg = &pg_backend.store;
1923 let tx_result: Result<
1924 (serde_json::Value, FnTrace, Vec<pylon_sync::ChangeEvent>),
1925 FnCallError,
1926 > = pg.with_transaction_crdt(crdt_hook, move |inner_store: &dyn DataStore| {
1927 let buffered = PgBufferedTxStore::new(inner_store);
1928 let (value, trace) = runner.call(
1929 &buffered,
1930 &fn_name_owned,
1931 fn_type,
1932 args,
1933 auth,
1934 on_stream,
1935 request,
1936 )?;
1937 Ok((value, trace, buffered.take_pending()))
1938 });
1939
1940 return match tx_result {
1941 Ok((value, trace, pending)) => {
1942 for ev in pending {
1946 let seq = self.change_log.append(
1947 &ev.entity,
1948 &ev.row_id,
1949 ev.kind.clone(),
1950 ev.data.clone(),
1951 );
1952 let event = pylon_sync::ChangeEvent { seq, ..ev };
1953 self.notifier.notify(&event);
1954 }
1955 self.flush_pending_schedules(sched_guard.take());
1959 drop(sched_guard);
1960 Ok((value, trace))
1961 }
1962 Err(e) => {
1963 drop(sched_guard);
1964 Err(e)
1965 }
1966 };
1967 }
1968 let conn_guard = self.runtime.lock_conn_pub().map_err(|e| FnCallError {
1977 code: e.code,
1978 message: e.message,
1979 })?;
1980
1981 if let Err(e) = conn_guard.execute("BEGIN", []) {
1982 return Err(FnCallError {
1983 code: "BEGIN_FAILED".into(),
1984 message: format!("Failed to start transaction: {e}"),
1985 });
1986 }
1987
1988 let sched_guard = ScheduleBufferGuard::enter();
1991 let _depth_guard = MutationDepthGuard::enter();
1993
1994 let tx_store = TxStore::new(&self.runtime, &conn_guard);
1995 let result = self.runner.call(
1996 &tx_store,
1997 fn_name,
1998 def.fn_type,
1999 args,
2000 auth,
2001 on_stream,
2002 request,
2003 );
2004
2005 let result = match result {
2010 Ok(value) => match conn_guard.execute("COMMIT", []) {
2011 Ok(_) => {
2012 for ev in tx_store.take_pending() {
2021 let seq = self.change_log.append(
2022 &ev.entity,
2023 &ev.row_id,
2024 ev.kind.clone(),
2025 ev.data.clone(),
2026 );
2027 let event = pylon_sync::ChangeEvent { seq, ..ev };
2028 self.notifier.notify(&event);
2029 }
2030 self.flush_pending_schedules(sched_guard.take());
2035 drop(sched_guard);
2036 Ok(value)
2037 }
2038 Err(commit_err) => {
2039 if let Err(rollback_err) = conn_guard.execute("ROLLBACK", []) {
2043 tracing::warn!(
2044 "[functions] ROLLBACK after COMMIT failure also failed: {rollback_err}"
2045 );
2046 }
2047 Err(FnCallError {
2048 code: "COMMIT_FAILED".into(),
2049 message: format!(
2050 "Function \"{fn_name}\" succeeded but COMMIT failed: {commit_err}"
2051 ),
2052 })
2053 }
2054 },
2055 Err(handler_err) => {
2056 if let Err(rollback_err) = conn_guard.execute("ROLLBACK", []) {
2057 tracing::warn!(
2060 "[functions] ROLLBACK after handler error failed: {rollback_err}"
2061 );
2062 }
2063 Err(handler_err)
2064 }
2065 };
2066 result
2068 }
2069 _ => self.runner.call(
2070 &*self.runtime,
2071 fn_name,
2072 def.fn_type,
2073 args,
2074 auth,
2075 on_stream,
2076 request,
2077 ),
2078 }
2079 }
2080
2081 fn recent_traces(&self, limit: usize) -> Vec<FnTrace> {
2082 self.runner.trace_log.recent(limit)
2083 }
2084
2085 fn check_rate_limit(&self, fn_name: &str, identity: &str) -> Result<(), u64> {
2086 let key = format!("{fn_name}::{identity}");
2087 self.fn_rate_limiter.check(&key)
2088 }
2089}
2090
2091pub fn find_functions_runtime() -> Option<String> {
2107 if let Ok(env_path) = std::env::var("PYLON_FUNCTIONS_RUNTIME") {
2108 if std::path::Path::new(&env_path).exists() {
2109 return Some(env_path);
2110 }
2111 }
2112
2113 let cwd = std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from("."));
2119 let home = std::env::var("HOME").unwrap_or_else(|_| ".".into());
2120 let relative_candidates = [
2121 "node_modules/@pylonsync/functions/src/runtime.ts",
2122 "node_modules/@pylonsync/functions/dist/runtime.js",
2123 "node_modules/@pylon/functions/src/runtime.ts",
2125 "node_modules/@pylon/functions/dist/runtime.js",
2126 "packages/functions/src/runtime.ts",
2128 ];
2129
2130 let mut dir: Option<&std::path::Path> = Some(cwd.as_path());
2131 while let Some(current) = dir {
2132 for rel in &relative_candidates {
2133 let candidate = current.join(rel);
2134 if candidate.exists() {
2135 return candidate.to_str().map(|s| s.to_string());
2136 }
2137 }
2138 dir = current.parent();
2139 }
2140
2141 let user_path = format!("{home}/.pylon/runtime.ts");
2143 if std::path::Path::new(&user_path).exists() {
2144 return Some(user_path);
2145 }
2146 None
2147}
2148
2149pub fn try_spawn_functions(
2150 runtime: Arc<Runtime>,
2151 job_queue: Arc<crate::jobs::JobQueue>,
2152 fn_rate_limiter: Arc<crate::rate_limit::RateLimiter>,
2153 change_log: Arc<pylon_sync::ChangeLog>,
2154 notifier: Arc<dyn pylon_router::ChangeNotifier>,
2155) -> Option<Arc<FnOpsImpl>> {
2156 let fn_dir = std::env::var("PYLON_FUNCTIONS_DIR").unwrap_or_else(|_| "functions".into());
2157 if !std::path::Path::new(&fn_dir).exists() {
2158 return None;
2159 }
2160
2161 let runtime_script = match find_functions_runtime() {
2162 Some(p) => p,
2163 None => {
2164 tracing::warn!(
2165 "[functions] No TypeScript runtime script found. TypeScript functions will be unavailable."
2166 );
2167 tracing::warn!(
2168 "[functions] Tried: $PYLON_FUNCTIONS_RUNTIME, node_modules/@pylon/functions/src/runtime.ts, ~/.pylon/runtime.ts, packages/functions/src/runtime.ts"
2169 );
2170 return None;
2171 }
2172 };
2173
2174 let runner = Arc::new(FnRunner::new(1000));
2175
2176 let defs = match runner.start("bun", &["run", &runtime_script, &fn_dir]) {
2180 Ok(defs) => defs,
2181 Err(e) => {
2182 tracing::warn!("[functions] Failed to start Bun runtime: {e}");
2183 tracing::warn!(
2184 "[functions] Install Bun from https://bun.sh — TypeScript functions will be unavailable."
2185 );
2186 return None;
2187 }
2188 };
2189
2190 let job_queue_for_handlers = Arc::clone(&job_queue);
2194
2195 runner.set_schedule_hook(Box::new(move |fn_name, args, delay_ms, run_at| {
2207 let buffered = MUTATION_SCHEDULE_BUFFER.with(|cell| {
2210 let slot = cell.borrow();
2211 slot.as_ref()
2212 .map(|b| {
2213 b.borrow_mut().push(PendingSchedule {
2214 fn_name: fn_name.to_string(),
2215 args: args.clone(),
2216 delay_ms,
2217 run_at,
2218 });
2219 })
2220 .is_some()
2221 });
2222 if buffered {
2223 return Ok(format!("pending:{fn_name}"));
2229 }
2230
2231 let delay_secs = match (delay_ms, run_at) {
2232 (Some(ms), _) => ms / 1000,
2233 (None, Some(ts)) => {
2234 let now = std::time::SystemTime::now()
2235 .duration_since(std::time::UNIX_EPOCH)
2236 .unwrap_or_default()
2237 .as_millis() as u64;
2238 if ts > now {
2239 (ts - now) / 1000
2240 } else {
2241 0
2242 }
2243 }
2244 _ => 0,
2245 };
2246 job_queue.try_enqueue_with_options(
2247 fn_name,
2248 args,
2249 crate::jobs::Priority::Normal,
2250 delay_secs,
2251 3,
2252 "functions",
2253 )
2254 }));
2255
2256 let registry = Arc::new(FnRegistry::new());
2257 let count = defs.len();
2258 registry.replace_all(defs);
2259 tracing::warn!("[functions] Loaded {count} function(s) from {fn_dir}");
2260
2261 let ops = Arc::new(FnOpsImpl {
2262 runner,
2263 registry,
2264 runtime,
2265 fn_rate_limiter,
2266 change_log,
2267 notifier,
2268 job_queue: Arc::clone(&job_queue_for_handlers),
2269 });
2270
2271 install_nested_call_hook(&ops);
2272 register_function_job_handlers(&ops, &job_queue_for_handlers);
2273 spawn_runtime_supervisor(Arc::clone(&ops));
2274 Some(ops)
2275}
2276
2277fn register_function_job_handlers(ops: &Arc<FnOpsImpl>, job_queue: &Arc<crate::jobs::JobQueue>) {
2290 use pylon_router::FnOps as _;
2291
2292 let fn_names: Vec<String> = ops.registry.list().into_iter().map(|d| d.name).collect();
2293
2294 for name in fn_names {
2295 let weak = Arc::downgrade(ops);
2296 let fn_name = name.clone();
2297 job_queue.register(
2298 &name,
2299 Arc::new(move |job: &crate::jobs::Job| {
2300 let ops = match weak.upgrade() {
2301 Some(o) => o,
2302 None => {
2303 return crate::jobs::JobResult::Failure(
2304 "RUNTIME_GONE: function ops dropped".into(),
2305 )
2306 }
2307 };
2308 let auth = FnAuth {
2309 user_id: None,
2310 is_admin: false,
2311 tenant_id: None,
2312 };
2313 match ops.call(&fn_name, job.payload.clone(), auth, None, None) {
2314 Ok(_) => crate::jobs::JobResult::Success,
2315 Err(e) => crate::jobs::JobResult::Retry(format!("{}: {}", e.code, e.message)),
2316 }
2317 }),
2318 );
2319 }
2320}
2321
2322fn install_nested_call_hook(ops: &Arc<FnOpsImpl>) {
2329 use pylon_functions::protocol::{AuthInfo, FnType};
2330
2331 let weak = Arc::downgrade(ops);
2332 ops.runner.set_nested_call_hook(Box::new(
2333 move |fn_name: &str,
2334 fn_type: FnType,
2335 args: serde_json::Value,
2336 auth: AuthInfo|
2337 -> Result<serde_json::Value, (String, String)> {
2338 let ops = match weak.upgrade() {
2339 Some(o) => o,
2340 None => {
2341 return Err((
2342 "RUNTIME_GONE".into(),
2343 "pylon runtime is shutting down".into(),
2344 ))
2345 }
2346 };
2347
2348 match fn_type {
2349 FnType::Mutation => {
2350 if in_mutation_tx() {
2359 return Err((
2360 "NESTED_MUTATION".into(),
2361 format!(
2362 "ctx.runMutation(\"{fn_name}\") is not allowed from inside \
2363 another mutation handler — the mutation handler IS the \
2364 transaction, and the connection mutex is non-reentrant. \
2365 Restructure the shared logic into a regular function (not \
2366 a registered mutation), or call from an action handler."
2367 ),
2368 ));
2369 }
2370
2371 if ops.runtime.is_postgres() {
2377 let pg_backend = ops.runtime.pg_backend().ok_or_else(|| {
2378 (
2379 "PG_BACKEND_MISSING".into(),
2380 "Postgres backend reported is_postgres=true but pg_backend() returned None".into(),
2381 )
2382 })?;
2383 let pg = &pg_backend.store;
2384 let runner = ops.runner.clone();
2385 let fn_name_owned = fn_name.to_string();
2386 let sched_guard = ScheduleBufferGuard::enter();
2387 let _depth_guard = MutationDepthGuard::enter();
2388 let crdt_hook: std::sync::Arc<
2392 dyn pylon_storage::pg_tx_store::PgCrdtHook,
2393 > = std::sync::Arc::new(crate::pg_loro_store::PgCrdtHookImpl {
2394 crdt: std::sync::Arc::clone(&pg_backend.crdt),
2395 manifest: std::sync::Arc::new(ops.runtime.manifest().clone()),
2396 });
2397 let tx_result: Result<
2398 (serde_json::Value, Vec<pylon_sync::ChangeEvent>),
2399 FnCallError,
2400 > = pg.with_transaction_crdt(crdt_hook, move |inner_store: &dyn DataStore| {
2401 let buffered = PgBufferedTxStore::new(inner_store);
2402 let (value, _trace) = runner.call_inner(
2403 &buffered,
2404 &fn_name_owned,
2405 fn_type,
2406 args,
2407 auth,
2408 None,
2409 None,
2410 )?;
2411 Ok((value, buffered.take_pending()))
2412 });
2413 return match tx_result {
2414 Ok((value, pending)) => {
2415 for ev in pending {
2416 let seq = ops.change_log.append(
2417 &ev.entity,
2418 &ev.row_id,
2419 ev.kind.clone(),
2420 ev.data.clone(),
2421 );
2422 let event = pylon_sync::ChangeEvent { seq, ..ev };
2423 ops.notifier.notify(&event);
2424 }
2425 ops.flush_pending_schedules(sched_guard.take());
2426 drop(sched_guard);
2427 Ok(value)
2428 }
2429 Err(e) => {
2430 drop(sched_guard);
2431 Err((e.code, e.message))
2432 }
2433 };
2434 }
2435
2436 let conn_guard = ops
2439 .runtime
2440 .lock_conn_pub()
2441 .map_err(|e| (e.code, e.message))?;
2442 if let Err(e) = conn_guard.execute("BEGIN", []) {
2443 return Err(("BEGIN_FAILED".into(), e.to_string()));
2444 }
2445 let sched_guard = ScheduleBufferGuard::enter();
2446 let _depth_guard = MutationDepthGuard::enter();
2447 let tx_store = TxStore::new(&ops.runtime, &conn_guard);
2448 let result = ops
2453 .runner
2454 .call_inner(&tx_store, fn_name, fn_type, args, auth, None, None);
2455 match result {
2456 Ok((value, _trace)) => {
2457 if let Err(e) = conn_guard.execute("COMMIT", []) {
2458 let _ = conn_guard.execute("ROLLBACK", []);
2459 return Err(("COMMIT_FAILED".into(), e.to_string()));
2460 }
2461 for ev in tx_store.take_pending() {
2468 let seq = ops.change_log.append(
2469 &ev.entity,
2470 &ev.row_id,
2471 ev.kind.clone(),
2472 ev.data.clone(),
2473 );
2474 let event = pylon_sync::ChangeEvent { seq, ..ev };
2475 ops.notifier.notify(&event);
2476 }
2477 ops.flush_pending_schedules(sched_guard.take());
2478 drop(sched_guard);
2479 Ok(value)
2480 }
2481 Err(e) => {
2482 let _ = conn_guard.execute("ROLLBACK", []);
2483 drop(sched_guard);
2484 Err((e.code, e.message))
2485 }
2486 }
2487 }
2488 _ => {
2489 let result = ops.runner.call_inner(
2493 &*ops.runtime,
2494 fn_name,
2495 fn_type,
2496 args,
2497 auth,
2498 None,
2499 None,
2500 );
2501 result.map(|(v, _)| v).map_err(|e| (e.code, e.message))
2502 }
2503 }
2504 },
2505 ));
2506}
2507
2508fn spawn_runtime_supervisor(ops: Arc<FnOpsImpl>) {
2516 use std::time::Duration;
2517
2518 std::thread::Builder::new()
2519 .name("pylon-fn-supervisor".into())
2520 .spawn(move || {
2521 let mut backoff = Duration::from_secs(1);
2522 let max_backoff = Duration::from_secs(30);
2523 loop {
2524 std::thread::sleep(Duration::from_secs(2));
2525 if ops.runner.is_alive() {
2526 backoff = Duration::from_secs(1);
2527 continue;
2528 }
2529 tracing::warn!(
2530 "[functions] Bun runtime is not alive — respawning after {:?}",
2531 backoff
2532 );
2533 std::thread::sleep(backoff);
2534 match ops.runner.respawn() {
2535 Ok(defs) => {
2536 let count = defs.len();
2537 ops.registry.replace_all(defs);
2541 tracing::warn!("[functions] Respawned Bun runtime ({count} fn(s))");
2542 backoff = Duration::from_secs(1);
2543 }
2544 Err(e) => {
2545 tracing::warn!("[functions] Respawn failed: {e}");
2546 let backoff_str = format!("{}", backoff.as_secs());
2553 pylon_observability::report_error(&pylon_observability::ErrorEvent {
2554 level: pylon_observability::ErrorLevel::Error,
2555 code: "FN_RESPAWN_FAILED",
2556 message: &e,
2557 context: &[
2558 ("component", "bun-runtime-supervisor"),
2559 ("backoff_secs", &backoff_str),
2560 ],
2561 });
2562 backoff = (backoff * 2).min(max_backoff);
2563 }
2564 }
2565 }
2566 })
2567 .expect("failed to spawn function runtime supervisor");
2568}