1use pylon_http::{DataError, DataStore};
8
9use crate::Runtime;
10
11#[derive(Debug, Clone)]
22pub(crate) struct PendingSchedule {
23 pub fn_name: String,
24 pub args: serde_json::Value,
25 pub delay_ms: Option<u64>,
26 pub run_at: Option<u64>,
27}
28
29thread_local! {
30 pub(crate) static MUTATION_SCHEDULE_BUFFER: std::cell::RefCell<Option<std::rc::Rc<std::cell::RefCell<Vec<PendingSchedule>>>>>
39 = const { std::cell::RefCell::new(None) };
40}
41
42pub(crate) struct ScheduleBufferGuard {
47 previous: Option<std::rc::Rc<std::cell::RefCell<Vec<PendingSchedule>>>>,
48 current: std::rc::Rc<std::cell::RefCell<Vec<PendingSchedule>>>,
49}
50
51impl ScheduleBufferGuard {
52 pub(crate) fn enter() -> Self {
53 let current = std::rc::Rc::new(std::cell::RefCell::new(Vec::new()));
54 let previous = MUTATION_SCHEDULE_BUFFER.with(|cell| {
55 let mut slot = cell.borrow_mut();
56 let old = slot.take();
57 *slot = Some(current.clone());
58 old
59 });
60 Self { previous, current }
61 }
62
63 pub(crate) fn take(&self) -> Vec<PendingSchedule> {
67 std::mem::take(&mut *self.current.borrow_mut())
68 }
69}
70
71impl Drop for ScheduleBufferGuard {
72 fn drop(&mut self) {
73 MUTATION_SCHEDULE_BUFFER.with(|cell| {
74 *cell.borrow_mut() = self.previous.take();
75 });
76 }
77}
78
79thread_local! {
84 static MUTATION_DEPTH: std::cell::Cell<u32> = const { std::cell::Cell::new(0) };
96}
97
98pub(crate) struct MutationDepthGuard;
103
104impl MutationDepthGuard {
105 pub(crate) fn enter() -> Self {
106 MUTATION_DEPTH.with(|d| d.set(d.get().saturating_add(1)));
107 Self
108 }
109}
110
111impl Drop for MutationDepthGuard {
112 fn drop(&mut self) {
113 MUTATION_DEPTH.with(|d| d.set(d.get().saturating_sub(1)));
114 }
115}
116
117pub(crate) fn in_mutation_tx() -> bool {
119 MUTATION_DEPTH.with(|d| d.get() > 0)
120}
121
122impl Runtime {
127 pub(crate) fn pg_transact_with_crdt(
133 &self,
134 pg: &crate::PgBackend,
135 ops: &[serde_json::Value],
136 ) -> Result<(bool, Vec<serde_json::Value>), DataError> {
137 use pylon_storage::pg_tx_store::{tx_delete, tx_insert, tx_update};
138
139 enum Op<'a> {
142 Insert {
143 entity: &'a str,
144 data: &'a serde_json::Value,
145 },
146 Update {
147 entity: &'a str,
148 id: &'a str,
149 data: &'a serde_json::Value,
150 },
151 Delete {
152 entity: &'a str,
153 id: &'a str,
154 },
155 }
156 let mut typed: Vec<Op<'_>> = Vec::with_capacity(ops.len());
157 for op in ops {
158 let op_type = op.get("op").and_then(|v| v.as_str()).unwrap_or("");
159 let entity = op
160 .get("entity")
161 .and_then(|v| v.as_str())
162 .ok_or_else(|| DataError {
163 code: "TX_INVALID_OP".into(),
164 message: "Each transact op must have an \"entity\" field".into(),
165 })?;
166 match op_type {
167 "insert" => {
168 let data = op.get("data").ok_or_else(|| DataError {
169 code: "TX_INVALID_OP".into(),
170 message: "insert op requires \"data\"".into(),
171 })?;
172 typed.push(Op::Insert { entity, data });
173 }
174 "update" => {
175 let id = op
176 .get("id")
177 .and_then(|v| v.as_str())
178 .ok_or_else(|| DataError {
179 code: "TX_INVALID_OP".into(),
180 message: "update op requires \"id\"".into(),
181 })?;
182 let data = op.get("data").ok_or_else(|| DataError {
183 code: "TX_INVALID_OP".into(),
184 message: "update op requires \"data\"".into(),
185 })?;
186 typed.push(Op::Update { entity, id, data });
187 }
188 "delete" => {
189 let id = op
190 .get("id")
191 .and_then(|v| v.as_str())
192 .ok_or_else(|| DataError {
193 code: "TX_INVALID_OP".into(),
194 message: "delete op requires \"id\"".into(),
195 })?;
196 typed.push(Op::Delete { entity, id });
197 }
198 other => {
199 return Err(DataError {
200 code: "TX_INVALID_OP".into(),
201 message: format!("unknown op \"{other}\""),
202 });
203 }
204 }
205 }
206
207 let mut crdt_touched: Vec<(String, String)> = Vec::new();
210
211 let manifest = self.manifest.clone();
212 let result = pg.store.with_transaction_raw(|tx| -> Result<Vec<serde_json::Value>, DataError> {
213 let mut json_results: Vec<serde_json::Value> = Vec::with_capacity(typed.len());
214 for op in &typed {
215 let result = match op {
216 Op::Insert { entity, data } => {
217 let ent = manifest.entities.iter().find(|e| e.name == *entity);
218 let id = if ent.map(|e| e.crdt).unwrap_or(false) {
219 let crdt_fields = self.crdt_fields_for(ent.unwrap()).map_err(|e| {
220 DataError { code: e.code, message: e.message }
221 })?;
222 let id = crate::generate_id();
223 pg.crdt
224 .apply_patch(tx, entity, &id, &crdt_fields, data)
225 .map_err(|e| DataError {
226 code: "CRDT_APPLY_FAILED".into(),
227 message: format!("crdt write {entity}/{id}: {e}"),
228 })?;
229 let mut row = (*data).clone();
230 if let Some(obj) = row.as_object_mut() {
231 obj.insert("id".into(), serde_json::Value::String(id.clone()));
232 }
233 tx_insert(tx, &manifest, entity, &row)?;
234 crdt_touched.push((entity.to_string(), id.clone()));
235 id
236 } else {
237 tx_insert(tx, &manifest, entity, data)?
238 };
239 serde_json::json!({ "op": "insert", "id": id })
240 }
241 Op::Update { entity, id, data } => {
242 let ent = manifest.entities.iter().find(|e| e.name == *entity);
243 let updated = if ent.map(|e| e.crdt).unwrap_or(false) {
244 let crdt_fields = self.crdt_fields_for(ent.unwrap()).map_err(|e| {
245 DataError { code: e.code, message: e.message }
246 })?;
247 pg.crdt
248 .apply_patch(tx, entity, id, &crdt_fields, data)
249 .map_err(|e| DataError {
250 code: "CRDT_APPLY_FAILED".into(),
251 message: format!("crdt update {entity}/{id}: {e}"),
252 })?;
253 let updated = tx_update(tx, &manifest, entity, id, data)?;
254 if !updated {
255 return Err(DataError {
256 code: "ENTITY_NOT_FOUND".into(),
257 message: format!(
258 "Update on {entity}/{id} found no row — refusing to commit \
259 a CRDT snapshot that would orphan."
260 ),
261 });
262 }
263 crdt_touched.push((entity.to_string(), id.to_string()));
264 updated
265 } else {
266 tx_update(tx, &manifest, entity, id, data)?
267 };
268 serde_json::json!({ "op": "update", "id": id, "updated": updated })
269 }
270 Op::Delete { entity, id } => {
271 let ent = manifest.entities.iter().find(|e| e.name == *entity);
272 let deleted = if ent.map(|e| e.crdt).unwrap_or(false) {
273 tx.execute(
274 "DELETE FROM _pylon_crdt_snapshots WHERE entity = $1 AND row_id = $2",
275 &[entity, id],
276 )
277 .map_err(|e| DataError {
278 code: "CRDT_SIDECAR_DELETE_FAILED".into(),
279 message: format!(
280 "delete pg crdt snapshot {entity}/{id}: {e}"
281 ),
282 })?;
283 let deleted = tx_delete(tx, &manifest, entity, id)?;
284 crdt_touched.push((entity.to_string(), id.to_string()));
285 deleted
286 } else {
287 tx_delete(tx, &manifest, entity, id)?
288 };
289 serde_json::json!({ "op": "delete", "id": id, "deleted": deleted })
290 }
291 };
292 json_results.push(result);
293 }
294 for (entity, id) in &crdt_touched {
296 pg.crdt.cache_after_commit(tx, entity, id);
297 }
298 Ok(json_results)
299 });
300
301 match result {
302 Ok(json_results) => Ok((true, json_results)),
303 Err(e) => {
304 for (entity, id) in &crdt_touched {
305 pg.crdt.evict(entity, id);
306 }
307 Err(e)
308 }
309 }
310 }
311}
312
313impl DataStore for Runtime {
318 fn manifest(&self) -> &pylon_kernel::AppManifest {
319 Runtime::manifest(self)
320 }
321
322 fn insert(&self, entity: &str, data: &serde_json::Value) -> Result<String, DataError> {
323 Runtime::insert(self, entity, data).map_err(into_data_error)
324 }
325
326 fn get_by_id(&self, entity: &str, id: &str) -> Result<Option<serde_json::Value>, DataError> {
327 Runtime::get_by_id(self, entity, id).map_err(into_data_error)
328 }
329
330 fn list(&self, entity: &str) -> Result<Vec<serde_json::Value>, DataError> {
331 Runtime::list(self, entity).map_err(into_data_error)
332 }
333
334 fn list_after(
335 &self,
336 entity: &str,
337 after: Option<&str>,
338 limit: usize,
339 ) -> Result<Vec<serde_json::Value>, DataError> {
340 Runtime::list_after(self, entity, after, limit).map_err(into_data_error)
341 }
342
343 fn update(&self, entity: &str, id: &str, data: &serde_json::Value) -> Result<bool, DataError> {
344 Runtime::update(self, entity, id, data).map_err(into_data_error)
345 }
346
347 fn delete(&self, entity: &str, id: &str) -> Result<bool, DataError> {
348 Runtime::delete(self, entity, id).map_err(into_data_error)
349 }
350
351 fn lookup(
352 &self,
353 entity: &str,
354 field: &str,
355 value: &str,
356 ) -> Result<Option<serde_json::Value>, DataError> {
357 Runtime::lookup(self, entity, field, value).map_err(into_data_error)
358 }
359
360 fn link(
361 &self,
362 entity: &str,
363 id: &str,
364 relation: &str,
365 target_id: &str,
366 ) -> Result<bool, DataError> {
367 Runtime::link(self, entity, id, relation, target_id).map_err(into_data_error)
368 }
369
370 fn unlink(&self, entity: &str, id: &str, relation: &str) -> Result<bool, DataError> {
371 Runtime::unlink(self, entity, id, relation).map_err(into_data_error)
372 }
373
374 fn query_filtered(
375 &self,
376 entity: &str,
377 filter: &serde_json::Value,
378 ) -> Result<Vec<serde_json::Value>, DataError> {
379 Runtime::query_filtered(self, entity, filter).map_err(into_data_error)
380 }
381
382 fn query_graph(&self, query: &serde_json::Value) -> Result<serde_json::Value, DataError> {
383 Runtime::query_graph(self, query).map_err(into_data_error)
384 }
385
386 fn aggregate(
387 &self,
388 entity: &str,
389 spec: &serde_json::Value,
390 ) -> Result<serde_json::Value, DataError> {
391 Runtime::aggregate(self, entity, spec).map_err(into_data_error)
392 }
393
394 fn transact(
395 &self,
396 ops: &[serde_json::Value],
397 ) -> Result<(bool, Vec<serde_json::Value>), DataError> {
398 if let Some(pg) = self.pg_backend() {
404 return self.pg_transact_with_crdt(pg, ops);
405 }
406 let conn = self.lock_conn_pub().map_err(into_data_error)?;
407 let _ = conn.execute("BEGIN", []);
408 let mut results: Vec<serde_json::Value> = Vec::new();
409 let mut rollback = false;
410
411 for op in ops {
412 let op_type = op.get("op").and_then(|v| v.as_str()).unwrap_or("");
413 let entity = op.get("entity").and_then(|v| v.as_str()).unwrap_or("");
414
415 match op_type {
416 "insert" => {
417 let data = op.get("data").cloned().unwrap_or(serde_json::json!({}));
418 match self.insert_with_conn(&conn, entity, &data) {
419 Ok(id) => {
420 results.push(serde_json::json!({"op": "insert", "id": id}));
421 }
422 Err(e) => {
423 results.push(serde_json::json!({"op": "insert", "error": e.message}));
424 rollback = true;
425 break;
426 }
427 }
428 }
429 "update" => {
430 let id = op.get("id").and_then(|v| v.as_str()).unwrap_or("");
431 let data = op.get("data").cloned().unwrap_or(serde_json::json!({}));
432 match self.update_with_conn(&conn, entity, id, &data) {
433 Ok(_) => {
434 results.push(serde_json::json!({"op": "update", "id": id}));
435 }
436 Err(e) => {
437 results.push(serde_json::json!({"op": "update", "error": e.message}));
438 rollback = true;
439 break;
440 }
441 }
442 }
443 "delete" => {
444 let id = op.get("id").and_then(|v| v.as_str()).unwrap_or("");
445 match self.delete_with_conn(&conn, entity, id) {
446 Ok(_) => {
447 results.push(serde_json::json!({"op": "delete", "id": id}));
448 }
449 Err(e) => {
450 results.push(serde_json::json!({"op": "delete", "error": e.message}));
451 rollback = true;
452 break;
453 }
454 }
455 }
456 _ => {
457 results.push(serde_json::json!({"op": op_type, "error": "unknown operation"}));
458 }
459 }
460 }
461
462 if rollback {
463 let _ = conn.execute("ROLLBACK", []);
464 } else {
465 let _ = conn.execute("COMMIT", []);
466 }
467
468 Ok((!rollback, results))
469 }
470
471 fn search(
478 &self,
479 entity: &str,
480 query: &serde_json::Value,
481 ) -> Result<serde_json::Value, DataError> {
482 let ent = self
483 .manifest()
484 .entities
485 .iter()
486 .find(|e| e.name == entity)
487 .ok_or_else(|| DataError {
488 code: "ENTITY_NOT_FOUND".into(),
489 message: format!("Unknown entity: {entity}"),
490 })?;
491 let cfg = ent.search.as_ref().ok_or_else(|| DataError {
492 code: "SEARCH_NOT_CONFIGURED".into(),
493 message: format!("Entity {entity} has no `search:` config"),
494 })?;
495 let parsed: pylon_storage::search::SearchQuery = serde_json::from_value(query.clone())
496 .map_err(|e| DataError {
497 code: "INVALID_QUERY".into(),
498 message: format!("search query body: {e}"),
499 })?;
500
501 if self.is_postgres() {
505 let pg = self.pg_data_store().ok_or_else(|| DataError {
506 code: "PG_DATASTORE_MISSING".into(),
507 message: "is_postgres=true but pg_data_store() returned None".into(),
508 })?;
509 let result = pg.run_search(entity, cfg, &parsed).map_err(|e| DataError {
510 code: e.code,
511 message: e.message,
512 })?;
513 return serde_json::to_value(&result).map_err(|e| DataError {
514 code: "SEARCH_SERIALIZE_FAILED".into(),
515 message: e.to_string(),
516 });
517 }
518
519 let conn = self.lock_conn_pub().map_err(into_data_error)?;
520 let result =
521 pylon_storage::search_query::run_search(&conn, entity, cfg, &parsed).map_err(|e| {
522 DataError {
523 code: e.code,
524 message: e.message,
525 }
526 })?;
527 serde_json::to_value(&result).map_err(|e| DataError {
528 code: "SEARCH_SERIALIZE_FAILED".into(),
529 message: e.to_string(),
530 })
531 }
532
533 fn crdt_snapshot(&self, entity: &str, row_id: &str) -> Result<Option<Vec<u8>>, DataError> {
538 if self.is_postgres() {
543 let ent = self
544 .manifest()
545 .entities
546 .iter()
547 .find(|e| e.name == entity)
548 .ok_or_else(|| DataError {
549 code: "ENTITY_NOT_FOUND".into(),
550 message: format!("Unknown entity: {entity}"),
551 })?;
552 if !ent.crdt {
553 return Ok(None);
554 }
555 let pg_backend = match self.pg_backend() {
556 Some(pg) => pg,
557 None => return Ok(None),
558 };
559 let snap = pg_backend.store.with_client(|client| -> Result<Vec<u8>, DataError> {
563 pg_backend
564 .crdt
565 .snapshot(client, entity, row_id)
566 .map_err(|e| DataError {
567 code: "CRDT_SNAPSHOT_FAILED".into(),
568 message: format!("snapshot {entity}/{row_id}: {e}"),
569 })
570 })?;
571 return Ok(Some(snap));
572 }
573 let ent = self
574 .manifest()
575 .entities
576 .iter()
577 .find(|e| e.name == entity)
578 .ok_or_else(|| DataError {
579 code: "ENTITY_NOT_FOUND".into(),
580 message: format!("Unknown entity: {entity}"),
581 })?;
582 if !ent.crdt {
583 return Ok(None);
584 }
585 let conn = self.lock_conn_pub().map_err(into_data_error)?;
586 let snap = self
587 .crdt_store()
588 .snapshot(&conn, entity, row_id)
589 .map_err(|e| DataError {
590 code: "CRDT_SNAPSHOT_FAILED".into(),
591 message: format!("snapshot {entity}/{row_id}: {e}"),
592 })?;
593 Ok(Some(snap))
594 }
595
596 fn crdt_apply_update(
606 &self,
607 entity: &str,
608 row_id: &str,
609 update: &[u8],
610 ) -> Result<Vec<u8>, DataError> {
611 if self.is_postgres() {
617 let ent = self
618 .manifest()
619 .entities
620 .iter()
621 .find(|e| e.name == entity)
622 .ok_or_else(|| DataError {
623 code: "ENTITY_NOT_FOUND".into(),
624 message: format!("Unknown entity: {entity}"),
625 })?
626 .clone();
627 if !ent.crdt {
628 return Err(DataError {
629 code: "NOT_SUPPORTED".into(),
630 message: format!(
631 "CRDT update sent for entity \"{entity}\" which has crdt: false"
632 ),
633 });
634 }
635 let pg_backend = self.pg_backend().ok_or_else(|| DataError {
636 code: "PG_BACKEND_MISSING".into(),
637 message: "is_postgres=true but pg_backend() returned None".into(),
638 })?;
639 let crdt_fields = self.crdt_fields_for(&ent).map_err(into_data_error)?;
640
641 let result = pg_backend.store.with_transaction_raw(|tx| -> Result<Vec<u8>, DataError> {
649 let projected = pg_backend
650 .crdt
651 .apply_remote_update(tx, entity, row_id, &crdt_fields, update)
652 .map_err(|e| {
653 let code = match &e {
661 crate::loro_store::LoroStoreError::Decode(_) => "CRDT_DECODE_FAILED",
662 _ => "CRDT_APPLY_FAILED",
663 };
664 DataError {
665 code: code.into(),
666 message: format!("crdt apply update {entity}/{row_id}: {e}"),
667 }
668 })?;
669 let updated = pylon_storage::pg_tx_store::tx_update(
670 tx,
671 self.manifest(),
672 entity,
673 row_id,
674 &projected,
675 )?;
676 if !updated {
677 return Err(DataError {
682 code: "ENTITY_NOT_FOUND".into(),
683 message: format!(
684 "Peer-pushed CRDT update targets {entity}/{row_id} which has \
685 no materialized row — refusing to commit an orphan snapshot."
686 ),
687 });
688 }
689 let snap = crate::pg_loro_store::PgLoroStore::read_snapshot_via_conn(tx, entity, row_id)
695 .map_err(|e| DataError {
696 code: "CRDT_SNAPSHOT_FAILED".into(),
697 message: format!(
698 "post-update snapshot {entity}/{row_id}: {e}"
699 ),
700 })?;
701 pg_backend.crdt.cache_after_commit(tx, entity, row_id);
704 Ok(snap)
705 });
706 if result.is_err() {
707 pg_backend.crdt.evict(entity, row_id);
712 }
713 return result;
714 }
715 let ent = self
718 .manifest()
719 .entities
720 .iter()
721 .find(|e| e.name == entity)
722 .ok_or_else(|| DataError {
723 code: "ENTITY_NOT_FOUND".into(),
724 message: format!("Unknown entity: {entity}"),
725 })?
726 .clone();
727 if !ent.crdt {
728 return Err(DataError {
729 code: "NOT_SUPPORTED".into(),
730 message: format!("Entity {entity} has crdt: false; client push requires CRDT mode"),
731 });
732 }
733 let crdt_fields = self.crdt_fields_for(&ent).map_err(into_data_error)?;
734
735 let conn = self.lock_conn_pub().map_err(into_data_error)?;
736 crate::with_write_tx(&conn, || -> Result<Vec<u8>, crate::RuntimeError> {
737 let projected = self
741 .crdt_store()
742 .apply_remote_update(&conn, entity, row_id, &crdt_fields, update)
743 .map_err(|e| crate::RuntimeError {
744 code: "CRDT_APPLY_FAILED".into(),
745 message: format!("apply_remote_update {entity}/{row_id}: {e}"),
746 })?;
747
748 let projection = projected.as_object().ok_or_else(|| crate::RuntimeError {
752 code: "CRDT_PROJECTION_INVALID".into(),
753 message: "projected row was not a JSON object".into(),
754 })?;
755
756 let mut set_clauses = Vec::with_capacity(projection.len());
757 let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
758 let mut idx = 1;
759 for (key, val) in projection {
760 if key == "id" {
761 continue;
762 }
763 set_clauses.push(format!("{} = ?{idx}", crate::quote_ident(key.as_str())));
764 values.push(crate::json_to_sql(val));
765 idx += 1;
766 }
767 if set_clauses.is_empty() {
768 } else {
773 values.push(Box::new(row_id.to_string()));
774 let sql = format!(
775 "UPDATE {} SET {} WHERE \"id\" = ?{idx}",
776 crate::quote_ident(entity),
777 set_clauses.join(", ")
778 );
779 let params: Vec<&dyn rusqlite::types::ToSql> =
780 values.iter().map(|v| v.as_ref()).collect();
781 conn.execute(&sql, params.as_slice())
782 .map_err(|e| crate::RuntimeError {
783 code: "UPDATE_FAILED".into(),
784 message: format!("post-merge UPDATE {entity}/{row_id}: {e}"),
785 })?;
786 }
787
788 let snap = self
790 .crdt_store()
791 .snapshot(&conn, entity, row_id)
792 .map_err(|e| crate::RuntimeError {
793 code: "CRDT_SNAPSHOT_FAILED".into(),
794 message: format!("post-merge snapshot {entity}/{row_id}: {e}"),
795 })?;
796 Ok(snap)
797 })
798 .map_err(into_data_error)
799 }
800}
801
802fn into_data_error(e: crate::RuntimeError) -> DataError {
803 DataError {
804 code: e.code,
805 message: e.message,
806 }
807}
808
809use crate::sse::SseHub;
814use crate::ws::WsHub;
815use std::sync::Arc;
816
817pub struct WsSseNotifier {
819 pub ws: Arc<WsHub>,
820 pub sse: Arc<SseHub>,
821}
822
823impl pylon_router::ChangeNotifier for WsSseNotifier {
824 fn notify(&self, event: &pylon_sync::ChangeEvent) {
825 self.ws.broadcast(event);
826 self.sse.broadcast(event);
827 }
828
829 fn notify_presence(&self, json: &str) {
830 self.ws.broadcast_presence(json);
831 self.sse.broadcast_message(json);
832 }
833
834 fn notify_crdt(&self, entity: &str, row_id: &str, snapshot: &[u8]) {
870 let subscribers = self.ws.subscriptions().subscribers(entity, row_id);
871 if subscribers.is_empty() {
872 return;
873 }
874 match pylon_router::encode_crdt_frame(
875 pylon_router::CRDT_FRAME_SNAPSHOT,
876 entity,
877 row_id,
878 snapshot,
879 ) {
880 Ok(frame) => self.ws.broadcast_binary_to(&subscribers, frame),
881 Err(e) => {
882 tracing::warn!("[crdt] dropping binary frame for {entity}/{row_id}: {e}");
883 }
884 }
885 }
886}
887
888fn to_json<T: serde::Serialize>(val: T) -> serde_json::Value {
890 serde_json::to_value(val).unwrap_or(serde_json::json!({}))
891}
892
893fn to_json_array<T: serde::Serialize>(val: T) -> serde_json::Value {
895 serde_json::to_value(val).unwrap_or(serde_json::json!([]))
896}
897
898use crate::rooms::RoomManager;
903
904impl pylon_router::RoomOps for RoomManager {
905 fn join(
906 &self,
907 room: &str,
908 user_id: &str,
909 data: Option<serde_json::Value>,
910 ) -> Result<(serde_json::Value, serde_json::Value), DataError> {
911 RoomManager::join(self, room, user_id, data)
912 .map(|(snapshot, join_event)| (to_json(&snapshot), to_json(&join_event)))
913 .map_err(|e| DataError {
914 code: e.code,
915 message: e.message,
916 })
917 }
918
919 fn leave(&self, room: &str, user_id: &str) -> Option<serde_json::Value> {
920 RoomManager::leave(self, room, user_id).map(|event| to_json(&event))
921 }
922
923 fn set_presence(
924 &self,
925 room: &str,
926 user_id: &str,
927 data: serde_json::Value,
928 ) -> Option<serde_json::Value> {
929 RoomManager::set_presence(self, room, user_id, data).map(|event| to_json(&event))
930 }
931
932 fn broadcast(
933 &self,
934 room: &str,
935 sender: Option<&str>,
936 topic: &str,
937 data: serde_json::Value,
938 ) -> Option<serde_json::Value> {
939 RoomManager::broadcast(self, room, sender, topic, data).map(|event| to_json(&event))
940 }
941
942 fn list_rooms(&self) -> Vec<String> {
943 RoomManager::list_rooms(self)
944 }
945
946 fn room_size(&self, name: &str) -> usize {
947 RoomManager::room_size(self, name)
948 }
949
950 fn members(&self, name: &str) -> Vec<serde_json::Value> {
951 RoomManager::members(self, name)
952 .into_iter()
953 .map(|p| to_json(p))
954 .collect()
955 }
956}
957
958use pylon_plugin::builtin::cache::CachePlugin;
963
964pub struct PluginHooksAdapter(pub Arc<pylon_plugin::PluginRegistry>);
974
975impl pylon_router::PluginHookOps for PluginHooksAdapter {
976 fn before_insert(
977 &self,
978 entity: &str,
979 data: &mut serde_json::Value,
980 auth: &pylon_auth::AuthContext,
981 ) -> Result<(), (u16, String, String)> {
982 self.0
983 .run_before_insert(entity, data, auth)
984 .map_err(|e| (e.status, e.code, e.message))
985 }
986 fn after_insert(
987 &self,
988 entity: &str,
989 id: &str,
990 data: &serde_json::Value,
991 auth: &pylon_auth::AuthContext,
992 ) {
993 self.0.run_after_insert(entity, id, data, auth);
994 }
995 fn before_update(
996 &self,
997 entity: &str,
998 id: &str,
999 data: &mut serde_json::Value,
1000 auth: &pylon_auth::AuthContext,
1001 ) -> Result<(), (u16, String, String)> {
1002 self.0
1003 .run_before_update(entity, id, data, auth)
1004 .map_err(|e| (e.status, e.code, e.message))
1005 }
1006 fn after_update(
1007 &self,
1008 entity: &str,
1009 id: &str,
1010 data: &serde_json::Value,
1011 auth: &pylon_auth::AuthContext,
1012 ) {
1013 self.0.run_after_update(entity, id, data, auth);
1014 }
1015 fn before_delete(
1016 &self,
1017 entity: &str,
1018 id: &str,
1019 auth: &pylon_auth::AuthContext,
1020 ) -> Result<(), (u16, String, String)> {
1021 self.0
1022 .run_before_delete(entity, id, auth)
1023 .map_err(|e| (e.status, e.code, e.message))
1024 }
1025 fn after_delete(&self, entity: &str, id: &str, auth: &pylon_auth::AuthContext) {
1026 self.0.run_after_delete(entity, id, auth);
1027 }
1028}
1029
1030pub struct CacheAdapter(pub Arc<CachePlugin>);
1031
1032impl pylon_router::CacheOps for CacheAdapter {
1033 fn handle_command(&self, body: &str) -> (u16, String) {
1034 crate::cache_handlers::handle_cache_command(&self.0, body)
1035 }
1036
1037 fn handle_get(&self, key: &str) -> (u16, String) {
1038 crate::cache_handlers::handle_cache_get(&self.0, key)
1039 }
1040
1041 fn handle_delete(&self, key: &str) -> (u16, String) {
1042 crate::cache_handlers::handle_cache_delete(&self.0, key)
1043 }
1044}
1045
1046use crate::pubsub::PubSubBroker;
1051
1052pub struct PubSubAdapter(pub Arc<PubSubBroker>);
1053
1054impl pylon_router::PubSubOps for PubSubAdapter {
1055 fn handle_publish(&self, body: &str) -> (u16, String) {
1056 crate::cache_handlers::handle_pubsub_publish(&self.0, body)
1057 }
1058
1059 fn handle_channels(&self) -> (u16, String) {
1060 crate::cache_handlers::handle_pubsub_channels(&self.0)
1061 }
1062
1063 fn handle_history(&self, channel: &str, url: &str) -> (u16, String) {
1064 crate::cache_handlers::handle_pubsub_history(&self.0, channel, url)
1065 }
1066}
1067
1068use crate::jobs::{JobQueue, Priority};
1073
1074impl pylon_router::JobOps for JobQueue {
1075 fn enqueue(
1076 &self,
1077 name: &str,
1078 payload: serde_json::Value,
1079 priority: &str,
1080 delay_secs: u64,
1081 max_retries: u32,
1082 queue: &str,
1083 ) -> String {
1084 let pri = Priority::from_str_loose(priority);
1085 JobQueue::enqueue_with_options(self, name, payload, pri, delay_secs, max_retries, queue)
1086 }
1087
1088 fn stats(&self) -> serde_json::Value {
1089 to_json(JobQueue::stats(self))
1090 }
1091
1092 fn dead_letters(&self) -> serde_json::Value {
1093 to_json_array(JobQueue::dead_letters(self))
1094 }
1095
1096 fn retry_dead(&self, id: &str) -> bool {
1097 JobQueue::retry_dead(self, id)
1098 }
1099
1100 fn list_jobs(
1101 &self,
1102 status: Option<&str>,
1103 queue: Option<&str>,
1104 limit: usize,
1105 ) -> serde_json::Value {
1106 to_json_array(JobQueue::list_jobs(self, status, queue, limit))
1107 }
1108
1109 fn get_job(&self, id: &str) -> Option<serde_json::Value> {
1110 JobQueue::get_job(self, id).map(|j| to_json(j))
1111 }
1112}
1113
1114use crate::scheduler::Scheduler;
1119
1120impl pylon_router::SchedulerOps for Scheduler {
1121 fn list_tasks(&self) -> serde_json::Value {
1122 to_json_array(Scheduler::list_tasks(self))
1123 }
1124
1125 fn trigger(&self, name: &str) -> bool {
1126 Scheduler::trigger(self, name)
1127 }
1128}
1129
1130use crate::workflows::WorkflowEngine;
1135
1136impl pylon_router::WorkflowOps for WorkflowEngine {
1137 fn definitions(&self) -> serde_json::Value {
1138 to_json_array(WorkflowEngine::definitions(self))
1139 }
1140
1141 fn start(&self, name: &str, input: serde_json::Value) -> Result<String, String> {
1142 WorkflowEngine::start(self, name, input)
1143 }
1144
1145 fn list(&self, status_filter: Option<&str>) -> serde_json::Value {
1146 let filter = status_filter.and_then(|s| match s {
1148 "pending" => Some(crate::workflows::WorkflowStatus::Pending),
1149 "running" => Some(crate::workflows::WorkflowStatus::Running),
1150 "sleeping" => Some(crate::workflows::WorkflowStatus::Sleeping),
1151 "waiting" => Some(crate::workflows::WorkflowStatus::WaitingForEvent),
1152 "completed" => Some(crate::workflows::WorkflowStatus::Completed),
1153 "failed" => Some(crate::workflows::WorkflowStatus::Failed),
1154 "cancelled" => Some(crate::workflows::WorkflowStatus::Cancelled),
1155 _ => None,
1156 });
1157 to_json_array(WorkflowEngine::list(self, filter.as_ref()))
1158 }
1159
1160 fn get(&self, id: &str) -> Option<serde_json::Value> {
1161 WorkflowEngine::get(self, id).map(|inst| to_json(inst))
1162 }
1163
1164 fn advance(&self, id: &str) -> Result<String, String> {
1165 WorkflowEngine::advance(self, id).map(|status| format!("{:?}", status))
1166 }
1167
1168 fn send_event(&self, id: &str, event: &str, data: serde_json::Value) -> Result<(), String> {
1169 WorkflowEngine::send_event(self, id, event, data)
1170 }
1171
1172 fn cancel(&self, id: &str) -> Result<(), String> {
1173 WorkflowEngine::cancel(self, id)
1174 }
1175}
1176
1177use pylon_storage::files::{FileStorage, LocalFileStorage, Stack0FileStorage};
1182
1183pub struct FileOpsAdapter {
1185 pub storage: Arc<dyn FileStorage>,
1186}
1187
1188impl FileOpsAdapter {
1189 pub fn from_env() -> Self {
1196 let provider = std::env::var("PYLON_FILES_PROVIDER").unwrap_or_else(|_| "local".into());
1197 match provider.as_str() {
1198 "stack0" => match Stack0FileStorage::from_env() {
1199 Some(s) => Self {
1200 storage: Arc::new(s),
1201 },
1202 None => {
1203 tracing::warn!(
1204 "PYLON_FILES_PROVIDER=stack0 but PYLON_STACK0_API_KEY is not set; falling back to local storage"
1205 );
1206 Self::local_from_env()
1207 }
1208 },
1209 _ => Self::local_from_env(),
1210 }
1211 }
1212
1213 fn local_from_env() -> Self {
1214 let dir = std::env::var("PYLON_FILES_DIR").unwrap_or_else(|_| "uploads".into());
1215 let url_prefix =
1216 std::env::var("PYLON_FILES_URL_PREFIX").unwrap_or_else(|_| "/api/files".into());
1217 Self {
1218 storage: Arc::new(LocalFileStorage::new(&dir, &url_prefix)),
1219 }
1220 }
1221}
1222
1223impl pylon_router::FileOps for FileOpsAdapter {
1224 fn upload(&self, _body: &str) -> (u16, String) {
1225 (
1231 400,
1232 pylon_router::json_error(
1233 "UPLOAD_NEEDS_BINARY",
1234 "File uploads must use multipart/form-data or raw binary with X-Filename; this platform does not support string-body uploads",
1235 ),
1236 )
1237 }
1238
1239 fn get_file(&self, id: &str) -> (u16, String) {
1240 match self.storage.get(id) {
1241 Ok(content) => (200, String::from_utf8_lossy(&content).into_owned()),
1242 Err(e) if e.code == "NOT_FOUND" => {
1243 (404, pylon_router::json_error("FILE_NOT_FOUND", &e.message))
1244 }
1245 Err(e) => (400, pylon_router::json_error(&e.code, &e.message)),
1246 }
1247 }
1248}
1249
1250pub type LocalFileOps = FileOpsAdapter;
1252
1253impl LocalFileOps {
1254 pub fn new_default() -> Self {
1256 Self::from_env()
1257 }
1258}
1259
1260use pylon_auth::email::{ConsoleTransport, EmailTransport, HttpEmailTransport};
1265
1266pub struct EmailAdapter {
1269 transport: Box<dyn EmailTransport>,
1270}
1271
1272impl EmailAdapter {
1273 pub fn from_env() -> Self {
1274 if let Some(http) = HttpEmailTransport::from_env() {
1275 Self {
1276 transport: Box::new(http),
1277 }
1278 } else {
1279 Self {
1280 transport: Box::new(ConsoleTransport),
1281 }
1282 }
1283 }
1284}
1285
1286impl pylon_router::EmailSender for EmailAdapter {
1287 fn send(&self, to: &str, subject: &str, body: &str) -> Result<(), String> {
1288 self.transport
1289 .send(to, subject, body)
1290 .map_err(|e| e.message)
1291 }
1292}
1293
1294pub struct RuntimeOpenApiGenerator<'a> {
1299 pub manifest: &'a pylon_kernel::AppManifest,
1300}
1301
1302impl<'a> pylon_router::OpenApiGenerator for RuntimeOpenApiGenerator<'a> {
1303 fn generate(&self, base_url: &str) -> String {
1304 let spec = crate::openapi::generate_openapi(self.manifest, base_url);
1305 serde_json::to_string(&spec).unwrap_or_else(|_| "{}".into())
1306 }
1307}
1308
1309pub struct ShardOpsAdapter {
1316 pub registry: Arc<dyn pylon_realtime::DynShardRegistry>,
1317}
1318
1319impl pylon_router::ShardOps for ShardOpsAdapter {
1320 fn get_shard(&self, id: &str) -> Option<Arc<dyn pylon_realtime::DynShard>> {
1321 self.registry.get(id)
1322 }
1323
1324 fn list_shards(&self) -> Vec<String> {
1325 self.registry.ids()
1326 }
1327
1328 fn shard_count(&self) -> usize {
1329 self.registry.len()
1330 }
1331}
1332
1333#[cfg(test)]
1334mod find_runtime_tests {
1335 use super::*;
1336
1337 #[test]
1338 fn env_override_takes_precedence() {
1339 let dir = std::env::temp_dir().join(format!("pylon_rt_{}", std::process::id()));
1340 let _ = std::fs::create_dir_all(&dir);
1341 let path = dir.join("custom_runtime.ts");
1342 std::fs::write(&path, "// test").unwrap();
1343
1344 std::env::set_var("PYLON_FUNCTIONS_RUNTIME", path.to_str().unwrap());
1345 let found = find_functions_runtime();
1346 std::env::remove_var("PYLON_FUNCTIONS_RUNTIME");
1347
1348 assert_eq!(found.as_deref(), path.to_str());
1349
1350 let _ = std::fs::remove_dir_all(&dir);
1351 }
1352
1353 #[test]
1354 fn returns_none_when_env_path_missing() {
1355 std::env::set_var(
1356 "PYLON_FUNCTIONS_RUNTIME",
1357 "/tmp/definitely-does-not-exist-42.ts",
1358 );
1359 let found = find_functions_runtime();
1362 std::env::remove_var("PYLON_FUNCTIONS_RUNTIME");
1363 assert_ne!(
1364 found.as_deref(),
1365 Some("/tmp/definitely-does-not-exist-42.ts")
1366 );
1367 }
1368}
1369
1370pub struct TxStore<'a> {
1409 runtime: &'a Runtime,
1410 conn: &'a rusqlite::Connection,
1411 pending: std::cell::RefCell<Vec<pylon_sync::ChangeEvent>>,
1416}
1417
1418impl<'a> TxStore<'a> {
1419 pub fn new(runtime: &'a Runtime, conn: &'a rusqlite::Connection) -> Self {
1420 Self {
1421 runtime,
1422 conn,
1423 pending: std::cell::RefCell::new(Vec::new()),
1424 }
1425 }
1426
1427 pub fn take_pending(&self) -> Vec<pylon_sync::ChangeEvent> {
1432 std::mem::take(&mut *self.pending.borrow_mut())
1433 }
1434
1435 fn record(
1436 &self,
1437 entity: &str,
1438 row_id: &str,
1439 kind: pylon_sync::ChangeKind,
1440 data: Option<&serde_json::Value>,
1441 ) {
1442 self.pending.borrow_mut().push(pylon_sync::ChangeEvent {
1443 seq: 0, entity: entity.to_string(),
1445 row_id: row_id.to_string(),
1446 kind,
1447 data: data.cloned(),
1448 timestamp: String::new(),
1449 });
1450 }
1451}
1452
1453unsafe impl<'a> Sync for TxStore<'a> {}
1455unsafe impl<'a> Send for TxStore<'a> {}
1456
1457impl<'a> DataStore for TxStore<'a> {
1458 fn manifest(&self) -> &pylon_kernel::AppManifest {
1459 self.runtime.manifest()
1460 }
1461
1462 fn insert(&self, entity: &str, data: &serde_json::Value) -> Result<String, DataError> {
1463 let id = self
1464 .runtime
1465 .insert_with_conn(self.conn, entity, data)
1466 .map_err(into_data_error)?;
1467 self.record(entity, &id, pylon_sync::ChangeKind::Insert, Some(data));
1471 Ok(id)
1472 }
1473
1474 fn get_by_id(&self, entity: &str, id: &str) -> Result<Option<serde_json::Value>, DataError> {
1475 self.runtime
1476 .get_by_id_with_conn(self.conn, entity, id)
1477 .map_err(into_data_error)
1478 }
1479
1480 fn list(&self, entity: &str) -> Result<Vec<serde_json::Value>, DataError> {
1481 self.runtime
1482 .list_with_conn(self.conn, entity)
1483 .map_err(into_data_error)
1484 }
1485
1486 fn list_after(
1487 &self,
1488 entity: &str,
1489 after: Option<&str>,
1490 limit: usize,
1491 ) -> Result<Vec<serde_json::Value>, DataError> {
1492 self.runtime
1493 .list_after_with_conn(self.conn, entity, after, limit)
1494 .map_err(into_data_error)
1495 }
1496
1497 fn update(&self, entity: &str, id: &str, data: &serde_json::Value) -> Result<bool, DataError> {
1498 let updated = self
1499 .runtime
1500 .update_with_conn(self.conn, entity, id, data)
1501 .map_err(into_data_error)?;
1502 if updated {
1503 self.record(entity, id, pylon_sync::ChangeKind::Update, Some(data));
1504 }
1505 Ok(updated)
1506 }
1507
1508 fn delete(&self, entity: &str, id: &str) -> Result<bool, DataError> {
1509 let deleted = self
1510 .runtime
1511 .delete_with_conn(self.conn, entity, id)
1512 .map_err(into_data_error)?;
1513 if deleted {
1514 self.record(entity, id, pylon_sync::ChangeKind::Delete, None);
1515 }
1516 Ok(deleted)
1517 }
1518
1519 fn lookup(
1520 &self,
1521 entity: &str,
1522 field: &str,
1523 value: &str,
1524 ) -> Result<Option<serde_json::Value>, DataError> {
1525 self.runtime
1526 .lookup_with_conn(self.conn, entity, field, value)
1527 .map_err(into_data_error)
1528 }
1529
1530 fn link(
1531 &self,
1532 entity: &str,
1533 id: &str,
1534 relation: &str,
1535 target_id: &str,
1536 ) -> Result<bool, DataError> {
1537 self.runtime
1538 .link_with_conn(self.conn, entity, id, relation, target_id)
1539 .map_err(into_data_error)
1540 }
1541
1542 fn unlink(&self, entity: &str, id: &str, relation: &str) -> Result<bool, DataError> {
1543 self.runtime
1544 .unlink_with_conn(self.conn, entity, id, relation)
1545 .map_err(into_data_error)
1546 }
1547
1548 fn query_filtered(
1549 &self,
1550 entity: &str,
1551 filter: &serde_json::Value,
1552 ) -> Result<Vec<serde_json::Value>, DataError> {
1553 self.runtime
1554 .query_filtered_with_conn(self.conn, entity, filter)
1555 .map_err(into_data_error)
1556 }
1557
1558 fn query_graph(&self, query: &serde_json::Value) -> Result<serde_json::Value, DataError> {
1559 self.runtime
1560 .query_graph_with_conn(self.conn, query)
1561 .map_err(into_data_error)
1562 }
1563
1564 fn aggregate(
1565 &self,
1566 entity: &str,
1567 spec: &serde_json::Value,
1568 ) -> Result<serde_json::Value, DataError> {
1569 Runtime::aggregate(self.runtime, entity, spec).map_err(into_data_error)
1573 }
1574
1575 fn transact(
1576 &self,
1577 _ops: &[serde_json::Value],
1578 ) -> Result<(bool, Vec<serde_json::Value>), DataError> {
1579 Err(DataError {
1582 code: "NESTED_TRANSACTION".into(),
1583 message: "ctx.db.transact() is not allowed inside a mutation handler (the handler itself is transactional)".into(),
1584 })
1585 }
1586
1587 fn search(
1588 &self,
1589 entity: &str,
1590 query: &serde_json::Value,
1591 ) -> Result<serde_json::Value, DataError> {
1592 <Runtime as DataStore>::search(self.runtime, entity, query)
1597 }
1598}
1599
1600struct PgBufferedTxStore<'a> {
1611 inner: &'a dyn DataStore,
1612 pending: std::sync::Mutex<Vec<pylon_sync::ChangeEvent>>,
1613}
1614
1615impl<'a> PgBufferedTxStore<'a> {
1616 fn new(inner: &'a dyn DataStore) -> Self {
1617 Self {
1618 inner,
1619 pending: std::sync::Mutex::new(Vec::new()),
1620 }
1621 }
1622
1623 fn record(
1624 &self,
1625 entity: &str,
1626 row_id: &str,
1627 kind: pylon_sync::ChangeKind,
1628 data: Option<&serde_json::Value>,
1629 ) {
1630 if let Ok(mut p) = self.pending.lock() {
1631 p.push(pylon_sync::ChangeEvent {
1632 seq: 0,
1633 entity: entity.to_string(),
1634 row_id: row_id.to_string(),
1635 kind,
1636 data: data.cloned(),
1637 timestamp: String::new(),
1638 });
1639 }
1640 }
1641
1642 fn take_pending(self) -> Vec<pylon_sync::ChangeEvent> {
1643 self.pending.into_inner().unwrap_or_default()
1644 }
1645}
1646
1647impl<'a> DataStore for PgBufferedTxStore<'a> {
1648 fn manifest(&self) -> &pylon_kernel::AppManifest {
1649 self.inner.manifest()
1650 }
1651
1652 fn insert(&self, entity: &str, data: &serde_json::Value) -> Result<String, DataError> {
1653 let id = self.inner.insert(entity, data)?;
1654 self.record(entity, &id, pylon_sync::ChangeKind::Insert, Some(data));
1655 Ok(id)
1656 }
1657
1658 fn get_by_id(&self, entity: &str, id: &str) -> Result<Option<serde_json::Value>, DataError> {
1659 self.inner.get_by_id(entity, id)
1660 }
1661
1662 fn list(&self, entity: &str) -> Result<Vec<serde_json::Value>, DataError> {
1663 self.inner.list(entity)
1664 }
1665
1666 fn list_after(
1667 &self,
1668 entity: &str,
1669 after: Option<&str>,
1670 limit: usize,
1671 ) -> Result<Vec<serde_json::Value>, DataError> {
1672 self.inner.list_after(entity, after, limit)
1673 }
1674
1675 fn update(&self, entity: &str, id: &str, data: &serde_json::Value) -> Result<bool, DataError> {
1676 let updated = self.inner.update(entity, id, data)?;
1677 if updated {
1678 self.record(entity, id, pylon_sync::ChangeKind::Update, Some(data));
1679 }
1680 Ok(updated)
1681 }
1682
1683 fn delete(&self, entity: &str, id: &str) -> Result<bool, DataError> {
1684 let deleted = self.inner.delete(entity, id)?;
1685 if deleted {
1686 self.record(entity, id, pylon_sync::ChangeKind::Delete, None);
1687 }
1688 Ok(deleted)
1689 }
1690
1691 fn lookup(
1692 &self,
1693 entity: &str,
1694 field: &str,
1695 value: &str,
1696 ) -> Result<Option<serde_json::Value>, DataError> {
1697 self.inner.lookup(entity, field, value)
1698 }
1699
1700 fn link(
1701 &self,
1702 entity: &str,
1703 id: &str,
1704 relation: &str,
1705 target_id: &str,
1706 ) -> Result<bool, DataError> {
1707 let linked = self.inner.link(entity, id, relation, target_id)?;
1708 if linked {
1709 let data = serde_json::json!({ relation: target_id });
1713 self.record(entity, id, pylon_sync::ChangeKind::Update, Some(&data));
1714 }
1715 Ok(linked)
1716 }
1717
1718 fn unlink(&self, entity: &str, id: &str, relation: &str) -> Result<bool, DataError> {
1719 let unlinked = self.inner.unlink(entity, id, relation)?;
1720 if unlinked {
1721 let data = serde_json::json!({ relation: serde_json::Value::Null });
1722 self.record(entity, id, pylon_sync::ChangeKind::Update, Some(&data));
1723 }
1724 Ok(unlinked)
1725 }
1726
1727 fn query_filtered(
1728 &self,
1729 entity: &str,
1730 filter: &serde_json::Value,
1731 ) -> Result<Vec<serde_json::Value>, DataError> {
1732 self.inner.query_filtered(entity, filter)
1733 }
1734
1735 fn query_graph(&self, query: &serde_json::Value) -> Result<serde_json::Value, DataError> {
1736 self.inner.query_graph(query)
1737 }
1738
1739 fn aggregate(
1740 &self,
1741 entity: &str,
1742 spec: &serde_json::Value,
1743 ) -> Result<serde_json::Value, DataError> {
1744 self.inner.aggregate(entity, spec)
1745 }
1746
1747 fn transact(
1748 &self,
1749 ops: &[serde_json::Value],
1750 ) -> Result<(bool, Vec<serde_json::Value>), DataError> {
1751 self.inner.transact(ops)
1756 }
1757
1758 fn search(
1759 &self,
1760 entity: &str,
1761 query: &serde_json::Value,
1762 ) -> Result<serde_json::Value, DataError> {
1763 self.inner.search(entity, query)
1769 }
1770}
1771
1772use pylon_functions::protocol::{AuthInfo as FnAuth, FnType};
1777use pylon_functions::registry::{FnDef, FnRegistry};
1778use pylon_functions::runner::{FnCallError, FnRunner};
1779use pylon_functions::trace::FnTrace;
1780
1781pub struct FnOpsImpl {
1786 pub runner: Arc<FnRunner>,
1787 pub registry: Arc<FnRegistry>,
1788 pub runtime: Arc<Runtime>,
1789 pub fn_rate_limiter: Arc<crate::rate_limit::RateLimiter>,
1793 pub change_log: Arc<pylon_sync::ChangeLog>,
1799 pub notifier: Arc<dyn pylon_router::ChangeNotifier>,
1801 pub job_queue: Arc<crate::jobs::JobQueue>,
1806}
1807
1808impl FnOpsImpl {
1809 fn flush_pending_schedules(&self, pending: Vec<PendingSchedule>) {
1813 for sched in pending {
1814 let delay_secs = match (sched.delay_ms, sched.run_at) {
1815 (Some(ms), _) => ms / 1000,
1816 (None, Some(ts)) => {
1817 let now = std::time::SystemTime::now()
1818 .duration_since(std::time::UNIX_EPOCH)
1819 .unwrap_or_default()
1820 .as_millis() as u64;
1821 if ts > now { (ts - now) / 1000 } else { 0 }
1822 }
1823 _ => 0,
1824 };
1825 if let Err(e) = self.job_queue.try_enqueue_with_options(
1826 &sched.fn_name,
1827 sched.args,
1828 crate::jobs::Priority::Normal,
1829 delay_secs,
1830 3,
1831 "functions",
1832 ) {
1833 tracing::warn!(
1837 "[functions] post-COMMIT enqueue failed for \"{}\": {e}",
1838 sched.fn_name
1839 );
1840 }
1841 }
1842 }
1843}
1844
1845impl pylon_router::FnOps for FnOpsImpl {
1846 fn get_fn(&self, name: &str) -> Option<FnDef> {
1847 self.registry.get(name)
1848 }
1849
1850 fn list_fns(&self) -> Vec<FnDef> {
1851 self.registry.list()
1852 }
1853
1854 fn call(
1855 &self,
1856 fn_name: &str,
1857 args: serde_json::Value,
1858 auth: FnAuth,
1859 on_stream: Option<Box<dyn FnMut(&str) + Send>>,
1860 request: Option<pylon_functions::protocol::RequestInfo>,
1861 ) -> Result<(serde_json::Value, FnTrace), FnCallError> {
1862 let def = self.registry.get(fn_name).ok_or_else(|| FnCallError {
1863 code: "FN_NOT_FOUND".into(),
1864 message: format!("Function \"{fn_name}\" is not registered"),
1865 })?;
1866
1867 match def.fn_type {
1868 FnType::Mutation => {
1869 if self.runtime.is_postgres() {
1876 let pg_backend = self.runtime.pg_backend().ok_or_else(|| FnCallError {
1877 code: "PG_BACKEND_MISSING".into(),
1878 message:
1879 "Postgres backend reported is_postgres=true but pg_backend() returned None"
1880 .into(),
1881 })?;
1882
1883 let runner = self.runner.clone();
1888 let fn_type = def.fn_type;
1889 let fn_name_owned = fn_name.to_string();
1890
1891 let sched_guard = ScheduleBufferGuard::enter();
1895 let _depth_guard = MutationDepthGuard::enter();
1899
1900 let crdt_hook: std::sync::Arc<
1906 dyn pylon_storage::pg_tx_store::PgCrdtHook,
1907 > = std::sync::Arc::new(crate::pg_loro_store::PgCrdtHookImpl {
1908 crdt: std::sync::Arc::clone(&pg_backend.crdt),
1909 manifest: std::sync::Arc::new(self.runtime.manifest().clone()),
1910 });
1911
1912 let pg = &pg_backend.store;
1913 let tx_result: Result<
1914 (serde_json::Value, FnTrace, Vec<pylon_sync::ChangeEvent>),
1915 FnCallError,
1916 > = pg.with_transaction_crdt(crdt_hook, move |inner_store: &dyn DataStore| {
1917 let buffered = PgBufferedTxStore::new(inner_store);
1918 let (value, trace) = runner.call(
1919 &buffered,
1920 &fn_name_owned,
1921 fn_type,
1922 args,
1923 auth,
1924 on_stream,
1925 request,
1926 )?;
1927 Ok((value, trace, buffered.take_pending()))
1928 });
1929
1930 return match tx_result {
1931 Ok((value, trace, pending)) => {
1932 for ev in pending {
1936 let seq = self.change_log.append(
1937 &ev.entity,
1938 &ev.row_id,
1939 ev.kind.clone(),
1940 ev.data.clone(),
1941 );
1942 let event = pylon_sync::ChangeEvent { seq, ..ev };
1943 self.notifier.notify(&event);
1944 }
1945 self.flush_pending_schedules(sched_guard.take());
1949 drop(sched_guard);
1950 Ok((value, trace))
1951 }
1952 Err(e) => {
1953 drop(sched_guard);
1954 Err(e)
1955 }
1956 };
1957 }
1958 let conn_guard = self.runtime.lock_conn_pub().map_err(|e| FnCallError {
1967 code: e.code,
1968 message: e.message,
1969 })?;
1970
1971 if let Err(e) = conn_guard.execute("BEGIN", []) {
1972 return Err(FnCallError {
1973 code: "BEGIN_FAILED".into(),
1974 message: format!("Failed to start transaction: {e}"),
1975 });
1976 }
1977
1978 let sched_guard = ScheduleBufferGuard::enter();
1981 let _depth_guard = MutationDepthGuard::enter();
1983
1984 let tx_store = TxStore::new(&self.runtime, &conn_guard);
1985 let result = self.runner.call(
1986 &tx_store,
1987 fn_name,
1988 def.fn_type,
1989 args,
1990 auth,
1991 on_stream,
1992 request,
1993 );
1994
1995 let result = match result {
2000 Ok(value) => match conn_guard.execute("COMMIT", []) {
2001 Ok(_) => {
2002 for ev in tx_store.take_pending() {
2011 let seq = self.change_log.append(
2012 &ev.entity,
2013 &ev.row_id,
2014 ev.kind.clone(),
2015 ev.data.clone(),
2016 );
2017 let event = pylon_sync::ChangeEvent { seq, ..ev };
2018 self.notifier.notify(&event);
2019 }
2020 self.flush_pending_schedules(sched_guard.take());
2025 drop(sched_guard);
2026 Ok(value)
2027 }
2028 Err(commit_err) => {
2029 if let Err(rollback_err) = conn_guard.execute("ROLLBACK", []) {
2033 tracing::warn!(
2034 "[functions] ROLLBACK after COMMIT failure also failed: {rollback_err}"
2035 );
2036 }
2037 Err(FnCallError {
2038 code: "COMMIT_FAILED".into(),
2039 message: format!(
2040 "Function \"{fn_name}\" succeeded but COMMIT failed: {commit_err}"
2041 ),
2042 })
2043 }
2044 },
2045 Err(handler_err) => {
2046 if let Err(rollback_err) = conn_guard.execute("ROLLBACK", []) {
2047 tracing::warn!(
2050 "[functions] ROLLBACK after handler error failed: {rollback_err}"
2051 );
2052 }
2053 Err(handler_err)
2054 }
2055 };
2056 result
2058 }
2059 _ => self.runner.call(
2060 &*self.runtime,
2061 fn_name,
2062 def.fn_type,
2063 args,
2064 auth,
2065 on_stream,
2066 request,
2067 ),
2068 }
2069 }
2070
2071 fn recent_traces(&self, limit: usize) -> Vec<FnTrace> {
2072 self.runner.trace_log.recent(limit)
2073 }
2074
2075 fn check_rate_limit(&self, fn_name: &str, identity: &str) -> Result<(), u64> {
2076 let key = format!("{fn_name}::{identity}");
2077 self.fn_rate_limiter.check(&key)
2078 }
2079}
2080
2081pub fn find_functions_runtime() -> Option<String> {
2097 if let Ok(env_path) = std::env::var("PYLON_FUNCTIONS_RUNTIME") {
2098 if std::path::Path::new(&env_path).exists() {
2099 return Some(env_path);
2100 }
2101 }
2102
2103 let cwd = std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from("."));
2109 let home = std::env::var("HOME").unwrap_or_else(|_| ".".into());
2110 let relative_candidates = [
2111 "node_modules/@pylonsync/functions/src/runtime.ts",
2112 "node_modules/@pylonsync/functions/dist/runtime.js",
2113 "node_modules/@pylon/functions/src/runtime.ts",
2115 "node_modules/@pylon/functions/dist/runtime.js",
2116 "packages/functions/src/runtime.ts",
2118 ];
2119
2120 let mut dir: Option<&std::path::Path> = Some(cwd.as_path());
2121 while let Some(current) = dir {
2122 for rel in &relative_candidates {
2123 let candidate = current.join(rel);
2124 if candidate.exists() {
2125 return candidate.to_str().map(|s| s.to_string());
2126 }
2127 }
2128 dir = current.parent();
2129 }
2130
2131 let user_path = format!("{home}/.pylon/runtime.ts");
2133 if std::path::Path::new(&user_path).exists() {
2134 return Some(user_path);
2135 }
2136 None
2137}
2138
2139pub fn try_spawn_functions(
2140 runtime: Arc<Runtime>,
2141 job_queue: Arc<crate::jobs::JobQueue>,
2142 fn_rate_limiter: Arc<crate::rate_limit::RateLimiter>,
2143 change_log: Arc<pylon_sync::ChangeLog>,
2144 notifier: Arc<dyn pylon_router::ChangeNotifier>,
2145) -> Option<Arc<FnOpsImpl>> {
2146 let fn_dir = std::env::var("PYLON_FUNCTIONS_DIR").unwrap_or_else(|_| "functions".into());
2147 if !std::path::Path::new(&fn_dir).exists() {
2148 return None;
2149 }
2150
2151 let runtime_script = match find_functions_runtime() {
2152 Some(p) => p,
2153 None => {
2154 tracing::warn!(
2155 "[functions] No TypeScript runtime script found. TypeScript functions will be unavailable."
2156 );
2157 tracing::warn!(
2158 "[functions] Tried: $PYLON_FUNCTIONS_RUNTIME, node_modules/@pylon/functions/src/runtime.ts, ~/.pylon/runtime.ts, packages/functions/src/runtime.ts"
2159 );
2160 return None;
2161 }
2162 };
2163
2164 let runner = Arc::new(FnRunner::new(1000));
2165
2166 let defs = match runner.start("bun", &["run", &runtime_script, &fn_dir]) {
2170 Ok(defs) => defs,
2171 Err(e) => {
2172 tracing::warn!("[functions] Failed to start Bun runtime: {e}");
2173 tracing::warn!(
2174 "[functions] Install Bun from https://bun.sh — TypeScript functions will be unavailable."
2175 );
2176 return None;
2177 }
2178 };
2179
2180 let job_queue_for_handlers = Arc::clone(&job_queue);
2184
2185 runner.set_schedule_hook(Box::new(move |fn_name, args, delay_ms, run_at| {
2197 let buffered = MUTATION_SCHEDULE_BUFFER.with(|cell| {
2200 let slot = cell.borrow();
2201 slot.as_ref().map(|b| {
2202 b.borrow_mut().push(PendingSchedule {
2203 fn_name: fn_name.to_string(),
2204 args: args.clone(),
2205 delay_ms,
2206 run_at,
2207 });
2208 }).is_some()
2209 });
2210 if buffered {
2211 return Ok(format!("pending:{fn_name}"));
2217 }
2218
2219 let delay_secs = match (delay_ms, run_at) {
2220 (Some(ms), _) => ms / 1000,
2221 (None, Some(ts)) => {
2222 let now = std::time::SystemTime::now()
2223 .duration_since(std::time::UNIX_EPOCH)
2224 .unwrap_or_default()
2225 .as_millis() as u64;
2226 if ts > now {
2227 (ts - now) / 1000
2228 } else {
2229 0
2230 }
2231 }
2232 _ => 0,
2233 };
2234 job_queue.try_enqueue_with_options(
2235 fn_name,
2236 args,
2237 crate::jobs::Priority::Normal,
2238 delay_secs,
2239 3,
2240 "functions",
2241 )
2242 }));
2243
2244 let registry = Arc::new(FnRegistry::new());
2245 let count = defs.len();
2246 registry.replace_all(defs);
2247 tracing::warn!("[functions] Loaded {count} function(s) from {fn_dir}");
2248
2249 let ops = Arc::new(FnOpsImpl {
2250 runner,
2251 registry,
2252 runtime,
2253 fn_rate_limiter,
2254 change_log,
2255 notifier,
2256 job_queue: Arc::clone(&job_queue_for_handlers),
2257 });
2258
2259 install_nested_call_hook(&ops);
2260 register_function_job_handlers(&ops, &job_queue_for_handlers);
2261 spawn_runtime_supervisor(Arc::clone(&ops));
2262 Some(ops)
2263}
2264
2265fn register_function_job_handlers(ops: &Arc<FnOpsImpl>, job_queue: &Arc<crate::jobs::JobQueue>) {
2278 use pylon_router::FnOps as _;
2279
2280 let fn_names: Vec<String> = ops.registry.list().into_iter().map(|d| d.name).collect();
2281
2282 for name in fn_names {
2283 let weak = Arc::downgrade(ops);
2284 let fn_name = name.clone();
2285 job_queue.register(
2286 &name,
2287 Arc::new(move |job: &crate::jobs::Job| {
2288 let ops = match weak.upgrade() {
2289 Some(o) => o,
2290 None => {
2291 return crate::jobs::JobResult::Failure(
2292 "RUNTIME_GONE: function ops dropped".into(),
2293 )
2294 }
2295 };
2296 let auth = FnAuth {
2297 user_id: None,
2298 is_admin: false,
2299 tenant_id: None,
2300 };
2301 match ops.call(&fn_name, job.payload.clone(), auth, None, None) {
2302 Ok(_) => crate::jobs::JobResult::Success,
2303 Err(e) => crate::jobs::JobResult::Retry(format!("{}: {}", e.code, e.message)),
2304 }
2305 }),
2306 );
2307 }
2308}
2309
2310fn install_nested_call_hook(ops: &Arc<FnOpsImpl>) {
2317 use pylon_functions::protocol::{AuthInfo, FnType};
2318
2319 let weak = Arc::downgrade(ops);
2320 ops.runner.set_nested_call_hook(Box::new(
2321 move |fn_name: &str,
2322 fn_type: FnType,
2323 args: serde_json::Value,
2324 auth: AuthInfo|
2325 -> Result<serde_json::Value, (String, String)> {
2326 let ops = match weak.upgrade() {
2327 Some(o) => o,
2328 None => {
2329 return Err((
2330 "RUNTIME_GONE".into(),
2331 "pylon runtime is shutting down".into(),
2332 ))
2333 }
2334 };
2335
2336 match fn_type {
2337 FnType::Mutation => {
2338 if in_mutation_tx() {
2347 return Err((
2348 "NESTED_MUTATION".into(),
2349 format!(
2350 "ctx.runMutation(\"{fn_name}\") is not allowed from inside \
2351 another mutation handler — the mutation handler IS the \
2352 transaction, and the connection mutex is non-reentrant. \
2353 Restructure the shared logic into a regular function (not \
2354 a registered mutation), or call from an action handler."
2355 ),
2356 ));
2357 }
2358
2359 if ops.runtime.is_postgres() {
2365 let pg_backend = ops.runtime.pg_backend().ok_or_else(|| {
2366 (
2367 "PG_BACKEND_MISSING".into(),
2368 "Postgres backend reported is_postgres=true but pg_backend() returned None".into(),
2369 )
2370 })?;
2371 let pg = &pg_backend.store;
2372 let runner = ops.runner.clone();
2373 let fn_name_owned = fn_name.to_string();
2374 let sched_guard = ScheduleBufferGuard::enter();
2375 let _depth_guard = MutationDepthGuard::enter();
2376 let crdt_hook: std::sync::Arc<
2380 dyn pylon_storage::pg_tx_store::PgCrdtHook,
2381 > = std::sync::Arc::new(crate::pg_loro_store::PgCrdtHookImpl {
2382 crdt: std::sync::Arc::clone(&pg_backend.crdt),
2383 manifest: std::sync::Arc::new(ops.runtime.manifest().clone()),
2384 });
2385 let tx_result: Result<
2386 (serde_json::Value, Vec<pylon_sync::ChangeEvent>),
2387 FnCallError,
2388 > = pg.with_transaction_crdt(crdt_hook, move |inner_store: &dyn DataStore| {
2389 let buffered = PgBufferedTxStore::new(inner_store);
2390 let (value, _trace) = runner.call_inner(
2391 &buffered,
2392 &fn_name_owned,
2393 fn_type,
2394 args,
2395 auth,
2396 None,
2397 None,
2398 )?;
2399 Ok((value, buffered.take_pending()))
2400 });
2401 return match tx_result {
2402 Ok((value, pending)) => {
2403 for ev in pending {
2404 let seq = ops.change_log.append(
2405 &ev.entity,
2406 &ev.row_id,
2407 ev.kind.clone(),
2408 ev.data.clone(),
2409 );
2410 let event = pylon_sync::ChangeEvent { seq, ..ev };
2411 ops.notifier.notify(&event);
2412 }
2413 ops.flush_pending_schedules(sched_guard.take());
2414 drop(sched_guard);
2415 Ok(value)
2416 }
2417 Err(e) => {
2418 drop(sched_guard);
2419 Err((e.code, e.message))
2420 }
2421 };
2422 }
2423
2424 let conn_guard = ops
2427 .runtime
2428 .lock_conn_pub()
2429 .map_err(|e| (e.code, e.message))?;
2430 if let Err(e) = conn_guard.execute("BEGIN", []) {
2431 return Err(("BEGIN_FAILED".into(), e.to_string()));
2432 }
2433 let sched_guard = ScheduleBufferGuard::enter();
2434 let _depth_guard = MutationDepthGuard::enter();
2435 let tx_store = TxStore::new(&ops.runtime, &conn_guard);
2436 let result = ops
2441 .runner
2442 .call_inner(&tx_store, fn_name, fn_type, args, auth, None, None);
2443 match result {
2444 Ok((value, _trace)) => {
2445 if let Err(e) = conn_guard.execute("COMMIT", []) {
2446 let _ = conn_guard.execute("ROLLBACK", []);
2447 return Err(("COMMIT_FAILED".into(), e.to_string()));
2448 }
2449 for ev in tx_store.take_pending() {
2456 let seq = ops.change_log.append(
2457 &ev.entity,
2458 &ev.row_id,
2459 ev.kind.clone(),
2460 ev.data.clone(),
2461 );
2462 let event = pylon_sync::ChangeEvent { seq, ..ev };
2463 ops.notifier.notify(&event);
2464 }
2465 ops.flush_pending_schedules(sched_guard.take());
2466 drop(sched_guard);
2467 Ok(value)
2468 }
2469 Err(e) => {
2470 let _ = conn_guard.execute("ROLLBACK", []);
2471 drop(sched_guard);
2472 Err((e.code, e.message))
2473 }
2474 }
2475 }
2476 _ => {
2477 let result = ops.runner.call_inner(
2481 &*ops.runtime,
2482 fn_name,
2483 fn_type,
2484 args,
2485 auth,
2486 None,
2487 None,
2488 );
2489 result.map(|(v, _)| v).map_err(|e| (e.code, e.message))
2490 }
2491 }
2492 },
2493 ));
2494}
2495
2496fn spawn_runtime_supervisor(ops: Arc<FnOpsImpl>) {
2504 use std::time::Duration;
2505
2506 std::thread::Builder::new()
2507 .name("pylon-fn-supervisor".into())
2508 .spawn(move || {
2509 let mut backoff = Duration::from_secs(1);
2510 let max_backoff = Duration::from_secs(30);
2511 loop {
2512 std::thread::sleep(Duration::from_secs(2));
2513 if ops.runner.is_alive() {
2514 backoff = Duration::from_secs(1);
2515 continue;
2516 }
2517 tracing::warn!(
2518 "[functions] Bun runtime is not alive — respawning after {:?}",
2519 backoff
2520 );
2521 std::thread::sleep(backoff);
2522 match ops.runner.respawn() {
2523 Ok(defs) => {
2524 let count = defs.len();
2525 ops.registry.replace_all(defs);
2529 tracing::warn!("[functions] Respawned Bun runtime ({count} fn(s))");
2530 backoff = Duration::from_secs(1);
2531 }
2532 Err(e) => {
2533 tracing::warn!("[functions] Respawn failed: {e}");
2534 let backoff_str = format!("{}", backoff.as_secs());
2541 pylon_observability::report_error(&pylon_observability::ErrorEvent {
2542 level: pylon_observability::ErrorLevel::Error,
2543 code: "FN_RESPAWN_FAILED",
2544 message: &e,
2545 context: &[
2546 ("component", "bun-runtime-supervisor"),
2547 ("backoff_secs", &backoff_str),
2548 ],
2549 });
2550 backoff = (backoff * 2).min(max_backoff);
2551 }
2552 }
2553 }
2554 })
2555 .expect("failed to spawn function runtime supervisor");
2556}