1use pylon_auth::{AuthContext, CookieConfig, MagicCodeStore, OAuthStateStore, SessionStore};
9use pylon_http::{DataError, DataStore, HttpMethod};
10use pylon_policy::PolicyEngine;
11use pylon_sync::{ChangeKind, ChangeLog};
12use std::cell::RefCell;
13
14mod routes;
15
16pub trait ChangeNotifier: Send + Sync {
25 fn notify(&self, event: &pylon_sync::ChangeEvent);
26 fn notify_presence(&self, json: &str);
27
28 fn notify_crdt(&self, _entity: &str, _row_id: &str, _snapshot: &[u8]) {}
37}
38
39pub struct NoopNotifier;
41
42impl ChangeNotifier for NoopNotifier {
43 fn notify(&self, _event: &pylon_sync::ChangeEvent) {}
44 fn notify_presence(&self, _json: &str) {}
45}
46
47pub const CRDT_FRAME_SNAPSHOT: u8 = 0x10;
67pub const CRDT_FRAME_UPDATE: u8 = 0x11;
69
70#[derive(Debug, Clone, PartialEq, Eq)]
75pub enum FrameEncodeError {
76 EntityTooLong { len: usize },
81 RowIdTooLong { len: usize },
85}
86
87impl std::fmt::Display for FrameEncodeError {
88 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89 match self {
90 Self::EntityTooLong { len } => write!(
91 f,
92 "CRDT frame: entity name {len} bytes exceeds u16 length limit ({})",
93 u16::MAX
94 ),
95 Self::RowIdTooLong { len } => write!(
96 f,
97 "CRDT frame: row_id {len} bytes exceeds u16 length limit ({})",
98 u16::MAX
99 ),
100 }
101 }
102}
103
104impl std::error::Error for FrameEncodeError {}
105
106pub fn encode_crdt_frame(
113 frame_type: u8,
114 entity: &str,
115 row_id: &str,
116 payload: &[u8],
117) -> Result<Vec<u8>, FrameEncodeError> {
118 let entity_bytes = entity.as_bytes();
119 let row_id_bytes = row_id.as_bytes();
120 if entity_bytes.len() > u16::MAX as usize {
121 return Err(FrameEncodeError::EntityTooLong {
122 len: entity_bytes.len(),
123 });
124 }
125 if row_id_bytes.len() > u16::MAX as usize {
126 return Err(FrameEncodeError::RowIdTooLong {
127 len: row_id_bytes.len(),
128 });
129 }
130 let entity_len = entity_bytes.len() as u16;
131 let row_id_len = row_id_bytes.len() as u16;
132 let mut out =
133 Vec::with_capacity(1 + 2 + entity_bytes.len() + 2 + row_id_bytes.len() + payload.len());
134 out.push(frame_type);
135 out.extend_from_slice(&entity_len.to_be_bytes());
136 out.extend_from_slice(entity_bytes);
137 out.extend_from_slice(&row_id_len.to_be_bytes());
138 out.extend_from_slice(row_id_bytes);
139 out.extend_from_slice(payload);
140 Ok(out)
141}
142
143#[cfg(test)]
144mod crdt_frame_tests {
145 use super::*;
146
147 #[test]
148 fn roundtrip_header_layout() {
149 let frame = encode_crdt_frame(
150 CRDT_FRAME_SNAPSHOT,
151 "Message",
152 "msg_123",
153 &[0xab, 0xcd, 0xef],
154 )
155 .unwrap();
156 assert_eq!(frame[0], 0x10);
157 assert_eq!(&frame[1..3], &[0, 7]);
159 assert_eq!(&frame[3..10], b"Message");
160 assert_eq!(&frame[10..12], &[0, 7]);
162 assert_eq!(&frame[12..19], b"msg_123");
163 assert_eq!(&frame[19..], &[0xab, 0xcd, 0xef]);
165 }
166
167 #[test]
168 fn empty_payload_still_carries_headers() {
169 let frame = encode_crdt_frame(CRDT_FRAME_UPDATE, "X", "y", &[]).unwrap();
170 assert_eq!(frame[0], 0x11);
171 assert_eq!(&frame[1..3], &[0, 1]);
172 assert_eq!(&frame[3..4], b"X");
173 assert_eq!(&frame[4..6], &[0, 1]);
174 assert_eq!(&frame[6..7], b"y");
175 assert_eq!(frame.len(), 7);
176 }
177
178 #[test]
179 fn hex_roundtrip() {
180 assert_eq!(super::decode_hex(""), Some(vec![]));
181 assert_eq!(super::decode_hex("00"), Some(vec![0x00]));
182 assert_eq!(super::decode_hex("ab"), Some(vec![0xab]));
183 assert_eq!(super::decode_hex("AB"), Some(vec![0xab]));
184 assert_eq!(
185 super::decode_hex("DEADBEEF"),
186 Some(vec![0xde, 0xad, 0xbe, 0xef])
187 );
188 }
189
190 #[test]
191 fn hex_rejects_malformed() {
192 assert_eq!(super::decode_hex("a"), None); assert_eq!(super::decode_hex("xy"), None); assert_eq!(super::decode_hex("ab cd"), None); }
196
197 #[test]
198 fn entity_too_long_errors() {
199 let huge_entity = "x".repeat(u16::MAX as usize + 1);
200 let err = encode_crdt_frame(CRDT_FRAME_SNAPSHOT, &huge_entity, "y", &[])
201 .expect_err("entity > u16::MAX must reject");
202 assert!(matches!(err, FrameEncodeError::EntityTooLong { .. }));
203 }
204
205 #[test]
206 fn row_id_too_long_errors() {
207 let huge_row_id = "x".repeat(u16::MAX as usize + 1);
208 let err = encode_crdt_frame(CRDT_FRAME_SNAPSHOT, "X", &huge_row_id, &[])
209 .expect_err("row_id > u16::MAX must reject");
210 assert!(matches!(err, FrameEncodeError::RowIdTooLong { .. }));
211 }
212}
213
214pub trait CacheOps: Send + Sync {
221 fn handle_command(&self, body: &str) -> (u16, String);
222 fn handle_get(&self, key: &str) -> (u16, String);
223 fn handle_delete(&self, key: &str) -> (u16, String);
224}
225
226pub trait PubSubOps: Send + Sync {
228 fn handle_publish(&self, body: &str) -> (u16, String);
229 fn handle_channels(&self) -> (u16, String);
230 fn handle_history(&self, channel: &str, url: &str) -> (u16, String);
231}
232
233pub trait RoomOps: Send + Sync {
235 fn join(
236 &self,
237 room: &str,
238 user_id: &str,
239 data: Option<serde_json::Value>,
240 ) -> Result<(serde_json::Value, serde_json::Value), DataError>;
241 fn leave(&self, room: &str, user_id: &str) -> Option<serde_json::Value>;
242 fn set_presence(
243 &self,
244 room: &str,
245 user_id: &str,
246 data: serde_json::Value,
247 ) -> Option<serde_json::Value>;
248 fn broadcast(
249 &self,
250 room: &str,
251 sender: Option<&str>,
252 topic: &str,
253 data: serde_json::Value,
254 ) -> Option<serde_json::Value>;
255 fn list_rooms(&self) -> Vec<String>;
256 fn room_size(&self, name: &str) -> usize;
257 fn members(&self, name: &str) -> Vec<serde_json::Value>;
258}
259
260pub trait JobOps: Send + Sync {
262 fn enqueue(
263 &self,
264 name: &str,
265 payload: serde_json::Value,
266 priority: &str,
267 delay_secs: u64,
268 max_retries: u32,
269 queue: &str,
270 ) -> String;
271 fn stats(&self) -> serde_json::Value;
272 fn dead_letters(&self) -> serde_json::Value;
273 fn retry_dead(&self, id: &str) -> bool;
274 fn list_jobs(
275 &self,
276 status: Option<&str>,
277 queue: Option<&str>,
278 limit: usize,
279 ) -> serde_json::Value;
280 fn get_job(&self, id: &str) -> Option<serde_json::Value>;
281}
282
283pub trait SchedulerOps: Send + Sync {
285 fn list_tasks(&self) -> serde_json::Value;
286 fn trigger(&self, name: &str) -> bool;
287}
288
289pub trait WorkflowOps: Send + Sync {
291 fn definitions(&self) -> serde_json::Value;
292 fn start(&self, name: &str, input: serde_json::Value) -> Result<String, String>;
293 fn list(&self, status_filter: Option<&str>) -> serde_json::Value;
294 fn get(&self, id: &str) -> Option<serde_json::Value>;
295 fn advance(&self, id: &str) -> Result<String, String>;
296 fn send_event(&self, id: &str, event: &str, data: serde_json::Value) -> Result<(), String>;
297 fn cancel(&self, id: &str) -> Result<(), String>;
298}
299
300pub trait FileOps: Send + Sync {
302 fn upload(&self, body: &str) -> (u16, String);
303 fn get_file(&self, id: &str) -> (u16, String);
304}
305
306pub trait EmailSender: Send + Sync {
308 fn send(&self, to: &str, subject: &str, body: &str) -> Result<(), String>;
309}
310
311pub trait ShardOps: Send + Sync {
313 fn get_shard(&self, id: &str) -> Option<std::sync::Arc<dyn pylon_realtime::DynShard>>;
315
316 fn list_shards(&self) -> Vec<String>;
318
319 fn shard_count(&self) -> usize;
321}
322
323pub trait OpenApiGenerator: Send + Sync {
325 fn generate(&self, base_url: &str) -> String;
326}
327
328pub trait PluginHookOps: Send + Sync {
341 fn before_insert(
342 &self,
343 entity: &str,
344 data: &mut serde_json::Value,
345 auth: &AuthContext,
346 ) -> Result<(), (u16, String, String)>;
347
348 fn after_insert(&self, entity: &str, id: &str, data: &serde_json::Value, auth: &AuthContext);
349
350 fn before_update(
351 &self,
352 entity: &str,
353 id: &str,
354 data: &mut serde_json::Value,
355 auth: &AuthContext,
356 ) -> Result<(), (u16, String, String)>;
357
358 fn after_update(&self, entity: &str, id: &str, data: &serde_json::Value, auth: &AuthContext);
359
360 fn before_delete(
361 &self,
362 entity: &str,
363 id: &str,
364 auth: &AuthContext,
365 ) -> Result<(), (u16, String, String)>;
366
367 fn after_delete(&self, entity: &str, id: &str, auth: &AuthContext);
368}
369
370pub struct NoopPluginHooks;
372
373impl PluginHookOps for NoopPluginHooks {
374 fn before_insert(
375 &self,
376 _entity: &str,
377 _data: &mut serde_json::Value,
378 _auth: &AuthContext,
379 ) -> Result<(), (u16, String, String)> {
380 Ok(())
381 }
382 fn after_insert(
383 &self,
384 _entity: &str,
385 _id: &str,
386 _data: &serde_json::Value,
387 _auth: &AuthContext,
388 ) {
389 }
390 fn before_update(
391 &self,
392 _entity: &str,
393 _id: &str,
394 _data: &mut serde_json::Value,
395 _auth: &AuthContext,
396 ) -> Result<(), (u16, String, String)> {
397 Ok(())
398 }
399 fn after_update(
400 &self,
401 _entity: &str,
402 _id: &str,
403 _data: &serde_json::Value,
404 _auth: &AuthContext,
405 ) {
406 }
407 fn before_delete(
408 &self,
409 _entity: &str,
410 _id: &str,
411 _auth: &AuthContext,
412 ) -> Result<(), (u16, String, String)> {
413 Ok(())
414 }
415 fn after_delete(&self, _entity: &str, _id: &str, _auth: &AuthContext) {}
416}
417
418pub trait FnOps: Send + Sync {
424 fn get_fn(&self, name: &str) -> Option<pylon_functions::registry::FnDef>;
426
427 fn list_fns(&self) -> Vec<pylon_functions::registry::FnDef>;
429
430 fn call(
439 &self,
440 fn_name: &str,
441 args: serde_json::Value,
442 auth: pylon_functions::protocol::AuthInfo,
443 on_stream: Option<Box<dyn FnMut(&str) + Send>>,
444 request: Option<pylon_functions::protocol::RequestInfo>,
445 ) -> Result<
446 (serde_json::Value, pylon_functions::trace::FnTrace),
447 pylon_functions::runner::FnCallError,
448 >;
449
450 fn recent_traces(&self, limit: usize) -> Vec<pylon_functions::trace::FnTrace>;
452
453 fn check_rate_limit(&self, _fn_name: &str, _identity: &str) -> Result<(), u64> {
461 Ok(())
462 }
463}
464
465pub struct RouterContext<'a> {
470 pub store: &'a dyn DataStore,
471 pub session_store: &'a SessionStore,
472 pub magic_codes: &'a MagicCodeStore,
473 pub oauth_state: &'a OAuthStateStore,
474 pub account_store: &'a pylon_auth::AccountStore,
479 pub api_keys: &'a pylon_auth::api_key::ApiKeyStore,
483 pub orgs: &'a pylon_auth::org::OrgStore,
486 pub siwe: &'a pylon_auth::siwe::NonceStore,
489 pub phone_codes: &'a pylon_auth::phone::PhoneCodeStore,
491 pub passkeys: &'a pylon_auth::webauthn::PasskeyStore,
494 pub policy_engine: &'a PolicyEngine,
495 pub change_log: &'a ChangeLog,
496 pub notifier: &'a dyn ChangeNotifier,
497 pub rooms: &'a dyn RoomOps,
498 pub cache: &'a dyn CacheOps,
499 pub pubsub: &'a dyn PubSubOps,
500 pub jobs: &'a dyn JobOps,
501 pub scheduler: &'a dyn SchedulerOps,
502 pub workflows: &'a dyn WorkflowOps,
503 pub files: &'a dyn FileOps,
504 pub openapi: &'a dyn OpenApiGenerator,
505 pub functions: Option<&'a dyn FnOps>,
506 pub email: &'a dyn EmailSender,
507 pub shards: Option<&'a dyn ShardOps>,
508 pub plugin_hooks: &'a dyn PluginHookOps,
509 pub auth_ctx: &'a AuthContext,
510 pub trusted_origins: &'a [String],
519 pub is_dev: bool,
520 pub request_headers: &'a [(String, String)],
525 pub peer_ip: &'a str,
532 pub cookie_config: &'a CookieConfig,
536 pub response_headers: RefCell<Vec<(String, String)>>,
541}
542
543impl<'a> RouterContext<'a> {
544 pub fn add_response_header(&self, name: impl Into<String>, value: impl Into<String>) {
546 self.response_headers
547 .borrow_mut()
548 .push((name.into(), value.into()));
549 }
550
551 pub fn take_response_headers(&self) -> Vec<(String, String)> {
554 std::mem::take(&mut *self.response_headers.borrow_mut())
555 }
556
557 pub fn request_origin(&self) -> Option<&str> {
561 self.request_headers
562 .iter()
563 .find(|(k, _)| k.eq_ignore_ascii_case("origin"))
564 .map(|(_, v)| v.as_str())
565 }
566
567 pub fn maybe_set_session_cookie(&self, token: &str) {
573 if self.request_origin().is_some() {
574 self.add_response_header("Set-Cookie", self.cookie_config.set_value(token));
575 }
576 }
577}
578
579pub(crate) struct OAuthError {
584 pub(crate) status: u16,
585 pub(crate) code: &'static str,
586 pub(crate) message: String,
587}
588
589fn truncate_for_redirect(s: &str) -> String {
598 const MAX: usize = 240;
599 if s.len() <= MAX {
600 return s.to_string();
601 }
602 let budget = MAX - "…".len();
603 let mut end = budget;
604 while end > 0 && !s.is_char_boundary(end) {
605 end -= 1;
606 }
607 format!("{}…", &s[..end])
608}
609
610pub(crate) fn complete_oauth_login_pkce(
621 ctx: &RouterContext,
622 provider: &str,
623 code: Option<&str>,
624 pkce_verifier: Option<&str>,
625 dev_email: Option<&str>,
626 dev_name: Option<&str>,
627) -> Result<(String, pylon_auth::Session), OAuthError> {
628 let (userinfo, tokens) = if let Some(code) = code {
629 let registry = pylon_auth::OAuthRegistry::shared();
630 let config = registry.get(provider).cloned().ok_or_else(|| OAuthError {
631 status: 404,
632 code: "PROVIDER_NOT_FOUND",
633 message: format!("OAuth provider \"{provider}\" not configured"),
634 })?;
635 let tokens = config
636 .exchange_code_full_pkce(code, pkce_verifier)
637 .map_err(|err| OAuthError {
638 status: 502,
639 code: "OAUTH_TOKEN_EXCHANGE_FAILED",
640 message: truncate_for_redirect(&format!("token exchange failed: {err}")),
645 })?;
646 let info = config
649 .fetch_userinfo_with_id_token(&tokens.access_token, tokens.id_token.as_deref())
650 .map_err(|err| OAuthError {
651 status: 502,
652 code: "OAUTH_TOKEN_EXCHANGE_FAILED",
653 message: truncate_for_redirect(&format!("userinfo fetch failed: {err}")),
654 })?;
655 (info, tokens)
656 } else if ctx.is_dev {
657 let email = dev_email.ok_or_else(|| OAuthError {
658 status: 400,
659 code: "MISSING_FIELD",
660 message: "OAuth callback requires `code` (or `email` in dev mode)".into(),
661 })?;
662 let info = pylon_auth::UserInfo {
667 provider: provider.to_string(),
668 provider_account_id: format!("dev:{email}"),
669 email: email.to_string(),
670 name: dev_name.map(String::from),
671 };
672 let tokens = pylon_auth::TokenSet {
673 access_token: "dev_access_token".into(),
674 refresh_token: None,
675 id_token: None,
676 expires_at: None,
677 scope: None,
678 };
679 (info, tokens)
680 } else {
681 return Err(OAuthError {
682 status: 400,
683 code: "MISSING_FIELD",
684 message: "OAuth callback requires an authorization `code` from the provider".into(),
685 });
686 };
687
688 let now = chrono_now_iso();
695
696 let user_id = if let Some(existing) = ctx
710 .account_store
711 .find_by_provider(provider, &userinfo.provider_account_id)
712 {
713 let mut refreshed = pylon_auth::Account::new(existing.user_id.clone(), &userinfo, &tokens);
716 refreshed.created_at = existing.created_at;
717 ctx.account_store.upsert(&refreshed);
718 existing.user_id
719 } else if let Ok(Some(row)) = ctx.store.lookup(
720 &ctx.store.manifest().auth.user.entity,
721 "email",
722 &userinfo.email,
723 ) {
724 let id = row["id"].as_str().unwrap_or("").to_string();
728 if id.is_empty() {
729 return Err(OAuthError {
730 status: 500,
731 code: "USER_LOOKUP_INVALID",
732 message: "User row matched by email but had no id field".into(),
733 });
734 }
735 if row.get("emailVerified").map_or(true, |v| v.is_null()) {
736 let _ = ctx.store.update(
740 &ctx.store.manifest().auth.user.entity,
741 &id,
742 &serde_json::json!({ "emailVerified": now }),
743 );
744 }
745 ctx.account_store
746 .upsert(&pylon_auth::Account::new(id.clone(), &userinfo, &tokens));
747 id
748 } else {
749 let display_name = userinfo.name.as_deref().unwrap_or(&userinfo.email);
753 let user_entity = ctx.store.manifest().auth.user.entity.clone();
754 let id = ctx
755 .store
756 .insert(
757 &user_entity,
758 &serde_json::json!({
759 "email": userinfo.email,
760 "displayName": display_name,
761 "emailVerified": now,
762 "createdAt": now,
763 }),
764 )
765 .map_err(|e| OAuthError {
766 status: 500,
767 code: "USER_CREATE_FAILED",
768 message: format!(
774 "failed to create User row for OAuth signup ({}): {}",
775 e.code, e.message
776 ),
777 })?;
778 ctx.account_store
779 .upsert(&pylon_auth::Account::new(id.clone(), &userinfo, &tokens));
780 id
781 };
782 let session = ctx.session_store.create(user_id.clone());
783 Ok((user_id, session))
784}
785
786pub(crate) fn parse_query(q: &str) -> std::collections::HashMap<String, String> {
792 let mut out = std::collections::HashMap::new();
793 for pair in q.split('&') {
794 if pair.is_empty() {
795 continue;
796 }
797 let (k, v) = pair.split_once('=').unwrap_or((pair, ""));
798 out.insert(query_decode(k), query_decode(v));
799 }
800 out
801}
802
803fn query_decode(s: &str) -> String {
809 percent_decode(s, false)
810}
811
812#[allow(dead_code)]
813fn percent_decode(s: &str, plus_is_space: bool) -> String {
814 let bytes = s.as_bytes();
815 let mut out = Vec::with_capacity(bytes.len());
816 let mut i = 0;
817 while i < bytes.len() {
818 match bytes[i] {
819 b'+' if plus_is_space => {
820 out.push(b' ');
821 i += 1;
822 }
823 b'%' if i + 2 < bytes.len() => {
824 let hex = std::str::from_utf8(&bytes[i + 1..i + 3]).unwrap_or("");
825 match u8::from_str_radix(hex, 16) {
826 Ok(b) => {
827 out.push(b);
828 i += 3;
829 }
830 Err(_) => {
831 out.push(bytes[i]);
832 i += 1;
833 }
834 }
835 }
836 b => {
837 out.push(b);
838 i += 1;
839 }
840 }
841 }
842 String::from_utf8_lossy(&out).into_owned()
843}
844
845pub(crate) fn redact_email(email: &str) -> String {
850 match email.find('@') {
851 Some(at) => {
852 let (user, domain) = email.split_at(at);
853 let prefix_len = user.len().min(2);
854 let prefix: String = user.chars().take(prefix_len).collect();
855 format!("{prefix}***{domain}")
856 }
857 None => "***".to_string(),
858 }
859}
860
861fn public_manifest(m: &pylon_kernel::AppManifest) -> pylon_kernel::AppManifest {
867 let mut out = m.clone();
868 for p in out.policies.iter_mut() {
869 p.allow = String::new();
870 p.allow_read = None;
871 p.allow_insert = None;
872 p.allow_update = None;
873 p.allow_delete = None;
874 }
875 out
876}
877
878pub(crate) fn url_encode(s: &str) -> String {
879 let mut out = String::with_capacity(s.len());
880 for b in s.bytes() {
881 match b {
882 b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => {
883 out.push(b as char)
884 }
885 _ => out.push_str(&format!("%{b:02X}")),
886 }
887 }
888 out
889}
890
891pub fn route(
899 ctx: &RouterContext,
900 method: HttpMethod,
901 url: &str,
902 body: &str,
903 auth_token: Option<&str>,
904) -> (u16, String, &'static str) {
905 let (status, body) = route_inner(ctx, method, url, body, auth_token);
906 (status, body, "application/json")
907}
908
909fn route_inner(
910 ctx: &RouterContext,
911 method: HttpMethod,
912 url: &str,
913 body: &str,
914 auth_token: Option<&str>,
915) -> (u16, String) {
916 if method == HttpMethod::Options {
918 return (204, String::new());
919 }
920
921 if url.starts_with("/api/manifest") && method == HttpMethod::Get {
930 let path = url.split('?').next().unwrap_or(url);
931 if path == "/api/manifest" {
932 let want_full = query_param(url, "full")
933 .map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
934 .unwrap_or(false);
935 let manifest = ctx.store.manifest();
936 let body = if want_full && ctx.auth_ctx.is_admin {
937 serde_json::to_string(manifest).unwrap_or_else(|_| "{}".into())
938 } else {
939 serde_json::to_string(&public_manifest(manifest)).unwrap_or_else(|_| "{}".into())
940 };
941 return (200, body);
942 }
943 }
944
945 if url == "/api/openapi.json" && method == HttpMethod::Get {
947 return (200, ctx.openapi.generate(""));
948 }
949
950 if let Some(r) = routes::auth::handle(ctx, method, url, body, auth_token) {
956 return r;
957 }
958
959 if let Some(r) = routes::sync::handle(ctx, method, url, body, auth_token) {
964 return r;
965 }
966
967 if let Some(r) = routes::rooms::handle(ctx, method, url, body, auth_token) {
972 return r;
973 }
974
975 if let Some(r) = routes::links::handle(ctx, method, url, body, auth_token) {
980 return r;
981 }
982 if let Some(r) = routes::files::handle(ctx, method, url, body, auth_token) {
983 return r;
984 }
985 if let Some(r) = routes::crdt::handle(ctx, method, url, body, auth_token) {
986 return r;
987 }
988
989 if let Some(r) = routes::queries::handle(ctx, method, url, body, auth_token) {
997 return r;
998 }
999 if let Some(r) = routes::actions::handle(ctx, method, url, body, auth_token) {
1000 return r;
1001 }
1002 if let Some(r) = routes::admin_data::handle(ctx, method, url, body, auth_token) {
1003 return r;
1004 }
1005 if let Some(r) = routes::auth_admin::handle(ctx, method, url, body, auth_token) {
1006 return r;
1007 }
1008 if let Some(r) = routes::ops_admin::handle(ctx, method, url, body, auth_token) {
1009 return r;
1010 }
1011 if let Some(r) = routes::search::handle(ctx, method, url, body, auth_token) {
1012 return r;
1013 }
1014
1015 if let Some(r) = routes::entities::handle(ctx, method, url, body, auth_token) {
1021 return r;
1022 }
1023
1024 if let Some(r) = routes::infra::handle(ctx, method, url, body, auth_token) {
1033 return r;
1034 }
1035 if let Some(r) = routes::functions::handle(ctx, method, url, body, auth_token) {
1036 return r;
1037 }
1038 if let Some(r) = routes::shards::handle(ctx, method, url, body, auth_token) {
1039 return r;
1040 }
1041 if let Some(r) = routes::workflows::handle(ctx, method, url, body, auth_token) {
1042 return r;
1043 }
1044 if let Some(r) = routes::ai::handle(ctx, method, url, body, auth_token) {
1045 return r;
1046 }
1047
1048 (
1053 404,
1054 json_error_with_hint(
1055 "NOT_FOUND",
1056 &format!("No API route matches {url}"),
1057 "Available endpoints: /api/entities/<entity>, /api/actions/<name>, /api/query, /api/auth/*, /api/sync/*, /api/files/*, /api/cache, /api/pubsub/*, /api/jobs, /api/scheduler, /api/workflows, /api/ai/*, /studio",
1058 ),
1059 )
1060}
1061
1062pub(crate) fn handle_list(store: &dyn DataStore, entity: &str, url: &str) -> (u16, String) {
1067 let limit: Option<usize> = url
1068 .split("limit=")
1069 .nth(1)
1070 .and_then(|s| s.split('&').next())
1071 .and_then(|s| s.parse().ok());
1072 let offset: usize = url
1073 .split("offset=")
1074 .nth(1)
1075 .and_then(|s| s.split('&').next())
1076 .and_then(|s| s.parse().ok())
1077 .unwrap_or(0);
1078
1079 let mut filter = serde_json::Map::new();
1082 if let Some(l) = limit {
1083 filter.insert("$limit".into(), serde_json::json!(l));
1084 }
1085 if offset > 0 {
1086 filter.insert("$offset".into(), serde_json::json!(offset));
1087 }
1088 let filter = serde_json::Value::Object(filter);
1089
1090 match store.query_filtered(entity, &filter) {
1091 Ok(rows) => {
1092 let count = rows.len();
1097 (
1098 200,
1099 serde_json::json!({
1100 "data": rows,
1101 "count": count,
1102 "offset": offset,
1103 "limit": limit,
1104 })
1105 .to_string(),
1106 )
1107 }
1108 Err(e) => (400, json_error(&e.code, &e.message)),
1109 }
1110}
1111
1112pub(crate) fn handle_get(store: &dyn DataStore, entity: &str, id: &str) -> (u16, String) {
1113 match store.get_by_id(entity, id) {
1114 Ok(Some(row)) => (
1115 200,
1116 serde_json::to_string(&row).unwrap_or_else(|_| "{}".into()),
1117 ),
1118 Ok(None) => (
1119 404,
1120 json_error("NOT_FOUND", &format!("{entity} with id \"{id}\" not found")),
1121 ),
1122 Err(e) => (400, json_error(&e.code, &e.message)),
1123 }
1124}
1125
1126pub(crate) fn handle_insert(ctx: &RouterContext, entity: &str, body: &str) -> (u16, String) {
1127 let mut data: serde_json::Value = match serde_json::from_str(body) {
1128 Ok(v) => v,
1129 Err(e) => {
1130 return (
1131 400,
1132 json_error_safe(
1133 "INVALID_JSON",
1134 "Invalid request body",
1135 &format!("Invalid JSON: {e}"),
1136 ),
1137 )
1138 }
1139 };
1140 if let Err((status, code, msg)) =
1144 ctx.plugin_hooks
1145 .before_insert(entity, &mut data, ctx.auth_ctx)
1146 {
1147 return (status, json_error(&code, &msg));
1148 }
1149 match ctx.store.insert(entity, &data) {
1150 Ok(id) => {
1151 let seq = ctx
1152 .change_log
1153 .append(entity, &id, ChangeKind::Insert, Some(data.clone()));
1154 broadcast_change_with_crdt(
1155 ctx.notifier,
1156 ctx.store,
1157 seq,
1158 entity,
1159 &id,
1160 ChangeKind::Insert,
1161 Some(&data),
1162 );
1163 ctx.plugin_hooks
1164 .after_insert(entity, &id, &data, ctx.auth_ctx);
1165 (201, serde_json::json!({"id": id}).to_string())
1166 }
1167 Err(e) => (400, json_error(&e.code, &e.message)),
1168 }
1169}
1170
1171pub(crate) fn handle_update(
1172 ctx: &RouterContext,
1173 entity: &str,
1174 id: &str,
1175 body: &str,
1176) -> (u16, String) {
1177 let mut data: serde_json::Value = match serde_json::from_str(body) {
1178 Ok(v) => v,
1179 Err(e) => {
1180 return (
1181 400,
1182 json_error_safe(
1183 "INVALID_JSON",
1184 "Invalid request body",
1185 &format!("Invalid JSON: {e}"),
1186 ),
1187 )
1188 }
1189 };
1190 if let Err((status, code, msg)) =
1191 ctx.plugin_hooks
1192 .before_update(entity, id, &mut data, ctx.auth_ctx)
1193 {
1194 return (status, json_error(&code, &msg));
1195 }
1196 match ctx.store.update(entity, id, &data) {
1197 Ok(true) => {
1198 let seq = ctx
1199 .change_log
1200 .append(entity, id, ChangeKind::Update, Some(data.clone()));
1201 broadcast_change_with_crdt(
1202 ctx.notifier,
1203 ctx.store,
1204 seq,
1205 entity,
1206 id,
1207 ChangeKind::Update,
1208 Some(&data),
1209 );
1210 ctx.plugin_hooks
1211 .after_update(entity, id, &data, ctx.auth_ctx);
1212 (200, serde_json::json!({"updated": true}).to_string())
1213 }
1214 Ok(false) => (
1215 404,
1216 json_error("NOT_FOUND", &format!("{entity}/{id} not found")),
1217 ),
1218 Err(e) => (400, json_error(&e.code, &e.message)),
1219 }
1220}
1221
1222pub(crate) fn handle_delete(ctx: &RouterContext, entity: &str, id: &str) -> (u16, String) {
1223 if let Err((status, code, msg)) = ctx.plugin_hooks.before_delete(entity, id, ctx.auth_ctx) {
1224 return (status, json_error(&code, &msg));
1225 }
1226 match ctx.store.delete(entity, id) {
1227 Ok(true) => {
1228 let seq = ctx.change_log.append(entity, id, ChangeKind::Delete, None);
1229 broadcast_change(ctx.notifier, seq, entity, id, ChangeKind::Delete, None);
1230 ctx.plugin_hooks.after_delete(entity, id, ctx.auth_ctx);
1231 (200, serde_json::json!({"deleted": true}).to_string())
1232 }
1233 Ok(false) => (
1234 404,
1235 json_error("NOT_FOUND", &format!("{entity}/{id} not found")),
1236 ),
1237 Err(e) => (400, json_error(&e.code, &e.message)),
1238 }
1239}
1240
1241pub(crate) fn broadcast_change(
1246 notifier: &dyn ChangeNotifier,
1247 seq: u64,
1248 entity: &str,
1249 row_id: &str,
1250 kind: ChangeKind,
1251 data: Option<&serde_json::Value>,
1252) {
1253 let event = pylon_sync::ChangeEvent {
1254 seq,
1255 entity: entity.to_string(),
1256 row_id: row_id.to_string(),
1257 kind,
1258 data: data.cloned(),
1259 timestamp: String::new(),
1260 };
1261 notifier.notify(&event);
1262}
1263
1264pub fn broadcast_change_with_crdt(
1274 notifier: &dyn ChangeNotifier,
1275 store: &dyn DataStore,
1276 seq: u64,
1277 entity: &str,
1278 row_id: &str,
1279 kind: ChangeKind,
1280 data: Option<&serde_json::Value>,
1281) {
1282 broadcast_change(notifier, seq, entity, row_id, kind.clone(), data);
1283 if matches!(kind, ChangeKind::Delete) {
1284 return;
1285 }
1286 if let Ok(Some(snapshot)) = store.crdt_snapshot(entity, row_id) {
1287 notifier.notify_crdt(entity, row_id, &snapshot);
1288 }
1289}
1290
1291pub(crate) fn decode_hex(s: &str) -> Option<Vec<u8>> {
1296 if s.len() % 2 != 0 {
1297 return None;
1298 }
1299 let mut out = Vec::with_capacity(s.len() / 2);
1300 let bytes = s.as_bytes();
1301 let mut i = 0;
1302 while i < bytes.len() {
1303 let hi = hex_nibble(bytes[i])?;
1304 let lo = hex_nibble(bytes[i + 1])?;
1305 out.push((hi << 4) | lo);
1306 i += 2;
1307 }
1308 Some(out)
1309}
1310
1311fn hex_nibble(b: u8) -> Option<u8> {
1312 match b {
1313 b'0'..=b'9' => Some(b - b'0'),
1314 b'a'..=b'f' => Some(b - b'a' + 10),
1315 b'A'..=b'F' => Some(b - b'A' + 10),
1316 _ => None,
1317 }
1318}
1319
1320pub fn json_error(code: &str, message: &str) -> String {
1321 serde_json::json!({"error": {"code": code, "message": message}}).to_string()
1322}
1323
1324const USER_REF_FIELDS: &[&str] = &[
1332 "userId",
1333 "user_id",
1334 "authorId",
1335 "author_id",
1336 "ownerId",
1337 "owner_id",
1338 "createdBy",
1339 "created_by",
1340];
1341
1342pub(crate) fn gdpr_export(ctx: &RouterContext, user_id: &str) -> (u16, String) {
1347 let manifest = ctx.store.manifest();
1348 let mut entities = serde_json::Map::new();
1349
1350 if let Ok(Some(user_row)) = ctx.store.get_by_id("User", user_id) {
1352 entities.insert("User".to_string(), serde_json::json!([user_row]));
1353 }
1354
1355 for ent in &manifest.entities {
1357 if ent.name == "User" {
1358 continue; }
1360 let user_field = ent
1361 .fields
1362 .iter()
1363 .find(|f| USER_REF_FIELDS.contains(&f.name.as_str()));
1364 let Some(field) = user_field else { continue };
1365 let filter = serde_json::json!({ &field.name: user_id });
1366 match ctx.store.query_filtered(&ent.name, &filter) {
1367 Ok(rows) if !rows.is_empty() => {
1368 entities.insert(ent.name.clone(), serde_json::Value::Array(rows));
1369 }
1370 Ok(_) => {}
1371 Err(e) => {
1372 tracing::warn!("[gdpr] export: query {} failed: {}", ent.name, e.message);
1373 }
1374 }
1375 }
1376
1377 let envelope = serde_json::json!({
1378 "user_id": user_id,
1379 "exported_at": pylon_kernel::util::now_iso(),
1380 "entities": entities,
1381 });
1382 (200, envelope.to_string())
1383}
1384
1385pub(crate) fn gdpr_purge(ctx: &RouterContext, user_id: &str) -> (u16, String) {
1389 let manifest = ctx.store.manifest();
1390 let mut deleted: u64 = 0;
1391 let mut errors: Vec<String> = Vec::new();
1392
1393 if let Ok(true) = ctx.store.delete("User", user_id) {
1395 deleted += 1;
1396 let seq = ctx
1398 .change_log
1399 .append("User", user_id, ChangeKind::Delete, None);
1400 broadcast_change(ctx.notifier, seq, "User", user_id, ChangeKind::Delete, None);
1401 }
1402
1403 for ent in &manifest.entities {
1405 if ent.name == "User" {
1406 continue;
1407 }
1408 let Some(field) = ent
1409 .fields
1410 .iter()
1411 .find(|f| USER_REF_FIELDS.contains(&f.name.as_str()))
1412 else {
1413 continue;
1414 };
1415 let filter = serde_json::json!({ &field.name: user_id });
1416 let rows = match ctx.store.query_filtered(&ent.name, &filter) {
1417 Ok(r) => r,
1418 Err(e) => {
1419 errors.push(format!("query {}: {}", ent.name, e.message));
1420 continue;
1421 }
1422 };
1423 for row in rows {
1424 let Some(id) = row.get("id").and_then(|v| v.as_str()) else {
1425 continue;
1426 };
1427 match ctx.store.delete(&ent.name, id) {
1428 Ok(true) => {
1429 deleted += 1;
1430 let seq = ctx
1431 .change_log
1432 .append(&ent.name, id, ChangeKind::Delete, None);
1433 broadcast_change(ctx.notifier, seq, &ent.name, id, ChangeKind::Delete, None);
1434 }
1435 Ok(false) => {}
1436 Err(e) => errors.push(format!("delete {}/{}: {}", ent.name, id, e.message)),
1437 }
1438 }
1439 }
1440
1441 let revoked = ctx.session_store.revoke_all_for_user(user_id);
1444
1445 let resp = serde_json::json!({
1446 "user_id": user_id,
1447 "rows_deleted": deleted,
1448 "sessions_revoked": revoked,
1449 "errors": errors,
1450 "purged_at": pylon_kernel::util::now_iso(),
1451 });
1452 (200, resp.to_string())
1453}
1454
1455pub(crate) fn require_admin(ctx: &RouterContext) -> Option<(u16, String)> {
1460 if ctx.auth_ctx.is_admin {
1461 None
1462 } else {
1463 Some((
1464 403,
1465 json_error(
1466 "FORBIDDEN",
1467 "this endpoint requires admin auth (PYLON_ADMIN_TOKEN)",
1468 ),
1469 ))
1470 }
1471}
1472
1473pub(crate) fn require_auth(ctx: &RouterContext) -> Option<(u16, String)> {
1478 if ctx.auth_ctx.is_admin || ctx.auth_ctx.user_id.is_some() {
1479 None
1480 } else {
1481 Some((
1482 401,
1483 json_error("AUTH_REQUIRED", "authenticated session required"),
1484 ))
1485 }
1486}
1487
1488pub fn json_error_with_hint(code: &str, message: &str, hint: &str) -> String {
1489 serde_json::json!({"error": {"code": code, "message": message, "hint": hint}}).to_string()
1490}
1491
1492pub fn json_error_safe(code: &str, user_message: &str, internal: &str) -> String {
1493 tracing::warn!("[error] {code}: {internal}");
1494 json_error(code, user_message)
1495}
1496
1497pub(crate) fn parse_json(body: &str) -> Result<serde_json::Value, (u16, String)> {
1499 serde_json::from_str(body).map_err(|e| {
1500 (
1501 400,
1502 json_error_safe(
1503 "INVALID_JSON",
1504 "Invalid request body",
1505 &format!("Invalid JSON: {e}"),
1506 ),
1507 )
1508 })
1509}
1510
1511pub(crate) fn query_param<'a>(url: &'a str, key: &str) -> Option<&'a str> {
1513 let search = format!("{key}=");
1514 url.split(&search).nth(1).and_then(|s| s.split('&').next())
1515}
1516
1517pub(crate) fn chrono_now_iso() -> String {
1518 pylon_kernel::util::now_iso()
1519}
1520
1521#[cfg(test)]
1531mod auth_gate_tests {
1532 use super::*;
1533 use pylon_auth::{AuthContext, CookieConfig, MagicCodeStore, OAuthStateStore, SessionStore};
1534 use pylon_kernel::{AppManifest, MANIFEST_VERSION};
1535 use pylon_policy::PolicyEngine;
1536 use pylon_sync::ChangeLog;
1537
1538 struct StubDataStore {
1544 manifest: AppManifest,
1545 }
1546 impl pylon_http::DataStore for StubDataStore {
1547 fn manifest(&self) -> &AppManifest {
1548 &self.manifest
1549 }
1550 fn insert(
1551 &self,
1552 _entity: &str,
1553 _data: &serde_json::Value,
1554 ) -> Result<String, pylon_http::DataError> {
1555 Ok("stub-id".to_string())
1556 }
1557 fn get_by_id(
1558 &self,
1559 _entity: &str,
1560 _id: &str,
1561 ) -> Result<Option<serde_json::Value>, pylon_http::DataError> {
1562 Ok(None)
1563 }
1564 fn list(&self, _entity: &str) -> Result<Vec<serde_json::Value>, pylon_http::DataError> {
1565 Ok(Vec::new())
1566 }
1567 fn list_after(
1568 &self,
1569 _entity: &str,
1570 _after: Option<&str>,
1571 _limit: usize,
1572 ) -> Result<Vec<serde_json::Value>, pylon_http::DataError> {
1573 Ok(Vec::new())
1574 }
1575 fn update(
1576 &self,
1577 _entity: &str,
1578 _id: &str,
1579 _data: &serde_json::Value,
1580 ) -> Result<bool, pylon_http::DataError> {
1581 Ok(true)
1582 }
1583 fn delete(&self, _entity: &str, _id: &str) -> Result<bool, pylon_http::DataError> {
1584 Ok(true)
1585 }
1586 fn lookup(
1587 &self,
1588 _entity: &str,
1589 _field: &str,
1590 _value: &str,
1591 ) -> Result<Option<serde_json::Value>, pylon_http::DataError> {
1592 Ok(None)
1593 }
1594 fn link(
1595 &self,
1596 _entity: &str,
1597 _id: &str,
1598 _relation: &str,
1599 _target_id: &str,
1600 ) -> Result<bool, pylon_http::DataError> {
1601 Ok(true)
1602 }
1603 fn unlink(
1604 &self,
1605 _entity: &str,
1606 _id: &str,
1607 _relation: &str,
1608 ) -> Result<bool, pylon_http::DataError> {
1609 Ok(true)
1610 }
1611 fn query_filtered(
1612 &self,
1613 _entity: &str,
1614 _filter: &serde_json::Value,
1615 ) -> Result<Vec<serde_json::Value>, pylon_http::DataError> {
1616 Ok(Vec::new())
1617 }
1618 fn query_graph(
1619 &self,
1620 _query: &serde_json::Value,
1621 ) -> Result<serde_json::Value, pylon_http::DataError> {
1622 Ok(serde_json::json!({}))
1623 }
1624 fn transact(
1625 &self,
1626 _ops: &[serde_json::Value],
1627 ) -> Result<(bool, Vec<serde_json::Value>), pylon_http::DataError> {
1628 Ok((true, Vec::new()))
1629 }
1630 }
1631
1632 macro_rules! stub_ops {
1633 ($name:ident, $trait:path) => {
1634 struct $name;
1635 };
1636 }
1637
1638 stub_ops!(StubRooms, RoomOps);
1639 stub_ops!(StubCache, CacheOps);
1640 stub_ops!(StubPubSub, PubSubOps);
1641 stub_ops!(StubJobs, JobOps);
1642 stub_ops!(StubScheduler, SchedulerOps);
1643 stub_ops!(StubWorkflows, WorkflowOps);
1644 stub_ops!(StubFiles, FileOps);
1645 stub_ops!(StubOpenApi, OpenApiGenerator);
1646 stub_ops!(StubEmail, EmailSender);
1647
1648 impl RoomOps for StubRooms {
1649 fn join(
1650 &self,
1651 _room: &str,
1652 _user_id: &str,
1653 _data: Option<serde_json::Value>,
1654 ) -> Result<(serde_json::Value, serde_json::Value), pylon_http::DataError> {
1655 Ok((serde_json::json!({}), serde_json::json!({})))
1656 }
1657 fn leave(&self, _room: &str, _user_id: &str) -> Option<serde_json::Value> {
1658 None
1659 }
1660 fn set_presence(
1661 &self,
1662 _room: &str,
1663 _user_id: &str,
1664 _data: serde_json::Value,
1665 ) -> Option<serde_json::Value> {
1666 None
1667 }
1668 fn broadcast(
1669 &self,
1670 _room: &str,
1671 _sender: Option<&str>,
1672 _topic: &str,
1673 _data: serde_json::Value,
1674 ) -> Option<serde_json::Value> {
1675 None
1676 }
1677 fn list_rooms(&self) -> Vec<String> {
1678 vec![]
1679 }
1680 fn room_size(&self, _name: &str) -> usize {
1681 0
1682 }
1683 fn members(&self, _name: &str) -> Vec<serde_json::Value> {
1684 vec![]
1685 }
1686 }
1687 impl CacheOps for StubCache {
1688 fn handle_command(&self, _body: &str) -> (u16, String) {
1689 (200, "{}".into())
1690 }
1691 fn handle_get(&self, _key: &str) -> (u16, String) {
1692 (404, "{}".into())
1693 }
1694 fn handle_delete(&self, _key: &str) -> (u16, String) {
1695 (200, "{}".into())
1696 }
1697 }
1698 impl PubSubOps for StubPubSub {
1699 fn handle_publish(&self, _body: &str) -> (u16, String) {
1700 (200, "{}".into())
1701 }
1702 fn handle_channels(&self) -> (u16, String) {
1703 (200, "[]".into())
1704 }
1705 fn handle_history(&self, _channel: &str, _url: &str) -> (u16, String) {
1706 (200, "[]".into())
1707 }
1708 }
1709 impl JobOps for StubJobs {
1710 fn enqueue(
1711 &self,
1712 _name: &str,
1713 _payload: serde_json::Value,
1714 _priority: &str,
1715 _delay_secs: u64,
1716 _max_retries: u32,
1717 _queue: &str,
1718 ) -> String {
1719 "job-id".into()
1720 }
1721 fn stats(&self) -> serde_json::Value {
1722 serde_json::json!({})
1723 }
1724 fn dead_letters(&self) -> serde_json::Value {
1725 serde_json::json!([])
1726 }
1727 fn retry_dead(&self, _id: &str) -> bool {
1728 false
1729 }
1730 fn list_jobs(
1731 &self,
1732 _status: Option<&str>,
1733 _queue: Option<&str>,
1734 _limit: usize,
1735 ) -> serde_json::Value {
1736 serde_json::json!([])
1737 }
1738 fn get_job(&self, _id: &str) -> Option<serde_json::Value> {
1739 None
1740 }
1741 }
1742 impl SchedulerOps for StubScheduler {
1743 fn list_tasks(&self) -> serde_json::Value {
1744 serde_json::json!([])
1745 }
1746 fn trigger(&self, _name: &str) -> bool {
1747 false
1748 }
1749 }
1750 impl WorkflowOps for StubWorkflows {
1751 fn definitions(&self) -> serde_json::Value {
1752 serde_json::json!([])
1753 }
1754 fn start(&self, _name: &str, _input: serde_json::Value) -> Result<String, String> {
1755 Ok("wf-id".into())
1756 }
1757 fn list(&self, _status: Option<&str>) -> serde_json::Value {
1758 serde_json::json!([])
1759 }
1760 fn get(&self, _id: &str) -> Option<serde_json::Value> {
1761 None
1762 }
1763 fn advance(&self, _id: &str) -> Result<String, String> {
1764 Ok("running".into())
1765 }
1766 fn send_event(
1767 &self,
1768 _id: &str,
1769 _event: &str,
1770 _data: serde_json::Value,
1771 ) -> Result<(), String> {
1772 Ok(())
1773 }
1774 fn cancel(&self, _id: &str) -> Result<(), String> {
1775 Ok(())
1776 }
1777 }
1778 impl FileOps for StubFiles {
1779 fn upload(&self, _body: &str) -> (u16, String) {
1780 (501, "{}".into())
1781 }
1782 fn get_file(&self, _id: &str) -> (u16, String) {
1783 (404, "{}".into())
1784 }
1785 }
1786 impl OpenApiGenerator for StubOpenApi {
1787 fn generate(&self, _base: &str) -> String {
1788 "{}".into()
1789 }
1790 }
1791 impl EmailSender for StubEmail {
1792 fn send(&self, _to: &str, _subject: &str, _body: &str) -> Result<(), String> {
1793 Ok(())
1794 }
1795 }
1796
1797 fn empty_manifest() -> AppManifest {
1798 AppManifest {
1799 manifest_version: MANIFEST_VERSION,
1800 name: "test".into(),
1801 version: "0.1.0".into(),
1802 entities: vec![],
1803 routes: vec![],
1804 queries: vec![],
1805 actions: vec![],
1806 policies: vec![],
1807 auth: Default::default(),
1808 }
1809 }
1810
1811 fn with_ctx<F>(is_dev: bool, auth: &AuthContext, f: F)
1813 where
1814 F: FnOnce(&RouterContext),
1815 {
1816 with_ctx_hooks(is_dev, auth, &NoopPluginHooks, f);
1817 }
1818
1819 fn with_ctx_hooks<F>(is_dev: bool, auth: &AuthContext, hooks: &dyn PluginHookOps, f: F)
1820 where
1821 F: FnOnce(&RouterContext),
1822 {
1823 let manifest = empty_manifest();
1824 let store = StubDataStore {
1825 manifest: manifest.clone(),
1826 };
1827 let session_store = SessionStore::new();
1828 let magic_codes = MagicCodeStore::new();
1829 let oauth_state = OAuthStateStore::new();
1830 let account_store = pylon_auth::AccountStore::new();
1831 let api_keys = pylon_auth::api_key::ApiKeyStore::new();
1832 let orgs = pylon_auth::org::OrgStore::new();
1833 let siwe = pylon_auth::siwe::NonceStore::new();
1834 let phone_codes = pylon_auth::phone::PhoneCodeStore::new();
1835 let passkeys = pylon_auth::webauthn::PasskeyStore::new();
1836 let policy_engine = PolicyEngine::from_manifest(&manifest);
1837 let change_log = ChangeLog::new();
1838 let notifier = NoopNotifier;
1839 let rooms = StubRooms;
1840 let cache = StubCache;
1841 let pubsub = StubPubSub;
1842 let jobs = StubJobs;
1843 let scheduler = StubScheduler;
1844 let workflows = StubWorkflows;
1845 let files = StubFiles;
1846 let openapi = StubOpenApi;
1847 let email = StubEmail;
1848 let cookie_config = CookieConfig::from_env(&CookieConfig::default_name_for("test"));
1849
1850 let ctx = RouterContext {
1851 store: &store,
1852 session_store: &session_store,
1853 magic_codes: &magic_codes,
1854 oauth_state: &oauth_state,
1855 account_store: &account_store,
1856 api_keys: &api_keys,
1857 orgs: &orgs,
1858 siwe: &siwe,
1859 phone_codes: &phone_codes,
1860 passkeys: &passkeys,
1861 trusted_origins: &[],
1862 policy_engine: &policy_engine,
1863 change_log: &change_log,
1864 notifier: ¬ifier,
1865 rooms: &rooms,
1866 cache: &cache,
1867 pubsub: &pubsub,
1868 jobs: &jobs,
1869 scheduler: &scheduler,
1870 workflows: &workflows,
1871 files: &files,
1872 openapi: &openapi,
1873 functions: None,
1874 email: &email,
1875 shards: None,
1876 plugin_hooks: hooks,
1877 auth_ctx: auth,
1878 is_dev,
1879 request_headers: &[],
1880 peer_ip: "127.0.0.1",
1881 cookie_config: &cookie_config,
1882 response_headers: RefCell::new(Vec::new()),
1883 };
1884 f(&ctx);
1885 }
1886
1887 #[test]
1894 fn auth_session_refuses_non_admin_in_prod() {
1895 let anon = AuthContext::anonymous();
1896 with_ctx(false, &anon, |ctx| {
1897 let (status, body, _ct) = route(
1898 ctx,
1899 HttpMethod::Post,
1900 "/api/auth/session",
1901 r#"{"user_id":"victim"}"#,
1902 None,
1903 );
1904 assert_eq!(status, 403);
1905 assert!(body.contains("FORBIDDEN"));
1906 });
1907 }
1908
1909 #[test]
1910 fn auth_session_allowed_for_admin_in_prod() {
1911 let admin = AuthContext::admin();
1912 with_ctx(false, &admin, |ctx| {
1913 let (status, _body, _ct) = route(
1914 ctx,
1915 HttpMethod::Post,
1916 "/api/auth/session",
1917 r#"{"user_id":"alice"}"#,
1918 None,
1919 );
1920 assert_eq!(status, 201);
1921 });
1922 }
1923
1924 #[test]
1928 fn oauth_callback_refuses_missing_code_in_prod() {
1929 let anon = AuthContext::anonymous();
1930 with_ctx(false, &anon, |ctx| {
1931 let state = ctx
1934 .oauth_state
1935 .create("google", "https://app/cb", "https://app/cb");
1936 let body = format!(r#"{{"state":"{state}","email":"victim@example.com"}}"#);
1937 let (status, resp, _ct) = route(
1938 ctx,
1939 HttpMethod::Post,
1940 "/api/auth/callback/google",
1941 &body,
1942 None,
1943 );
1944 assert_eq!(status, 400);
1945 assert!(resp.contains("authorization") || resp.contains("code"));
1946 });
1947 }
1948
1949 #[test]
1951 fn sync_push_requires_admin() {
1952 let anon = AuthContext::anonymous();
1953 with_ctx(false, &anon, |ctx| {
1954 let (status, body, _ct) = route(
1955 ctx,
1956 HttpMethod::Post,
1957 "/api/sync/push",
1958 r#"{"changes":[]}"#,
1959 None,
1960 );
1961 assert_eq!(status, 403);
1962 assert!(body.contains("FORBIDDEN"));
1963 });
1964 }
1965
1966 #[test]
1968 fn transact_requires_admin() {
1969 let anon = AuthContext::anonymous();
1970 with_ctx(false, &anon, |ctx| {
1971 let (status, _body, _ct) = route(
1972 ctx,
1973 HttpMethod::Post,
1974 "/api/transact",
1975 r#"{"ops":[]}"#,
1976 None,
1977 );
1978 assert_eq!(status, 403);
1979 });
1980 }
1981
1982 #[test]
1984 fn workflow_start_requires_admin() {
1985 let anon = AuthContext::anonymous();
1986 with_ctx(false, &anon, |ctx| {
1987 let (status, _body, _ct) = route(
1988 ctx,
1989 HttpMethod::Post,
1990 "/api/workflows/start",
1991 r#"{"name":"x"}"#,
1992 None,
1993 );
1994 assert_eq!(status, 403);
1995 });
1996 }
1997
1998 #[test]
2000 fn jobs_enqueue_requires_admin() {
2001 let anon = AuthContext::anonymous();
2002 with_ctx(false, &anon, |ctx| {
2003 let (status, _body, _ct) = route(
2004 ctx,
2005 HttpMethod::Post,
2006 "/api/jobs",
2007 r#"{"name":"x","payload":{}}"#,
2008 None,
2009 );
2010 assert_eq!(status, 403);
2011 });
2012 }
2013
2014 fn assert_route_doesnt_panic(ctx: &RouterContext, method: HttpMethod, url: &str, body: &str) {
2025 let (_status, _body, _ct) = route(ctx, method, url, body, None);
2029 }
2030
2031 #[test]
2032 fn fuzz_malformed_json_bodies_never_panic() {
2033 let admin = AuthContext::admin();
2034 with_ctx(true, &admin, |ctx| {
2035 let samples = [
2036 "",
2037 "not json",
2038 "{",
2039 "}",
2040 "{\"",
2041 "{\"key\":",
2042 "[]",
2043 "null",
2044 "true",
2045 "\"string\"",
2046 "{\"changes\":\"not an array\"}",
2047 &format!("{{\"deeply\":{}}}", "{".repeat(1000)),
2048 "{\"unicode\":\"\\u0000\"}",
2049 "{\"numbers\":1e308}",
2050 "{\"negative\":-999999999999999}",
2051 ];
2052 for body in &samples {
2053 for url in &[
2054 "/api/sync/push",
2055 "/api/transact",
2056 "/api/import",
2057 "/api/batch",
2058 "/api/jobs",
2059 "/api/auth/session",
2060 "/api/auth/magic/send",
2061 ] {
2062 assert_route_doesnt_panic(ctx, HttpMethod::Post, url, body);
2063 }
2064 }
2065 });
2066 }
2067
2068 #[test]
2069 fn fuzz_weird_urls_never_panic() {
2070 let admin = AuthContext::admin();
2071 with_ctx(true, &admin, |ctx| {
2072 let samples = [
2073 "/",
2074 "/api",
2075 "/api/",
2076 "/api/entities/",
2077 "/api/entities//",
2078 "/api/entities/%00",
2079 "/api/entities/../escape",
2080 "/api/entities/User?garbage=\x01",
2081 "/api/entities/User?$limit=abc&$order=garbage",
2082 &format!("/api/entities/{}", "a".repeat(10_000)),
2083 "/api/fn/",
2084 "/api/fn/traces",
2085 "/api/shards/id/connect",
2086 "/api/workflows/definitions",
2087 "/api/workflows/nonexistent/advance",
2088 "/api/rooms/",
2089 "/api/rooms/%20",
2090 ];
2091 for url in &samples {
2092 assert_route_doesnt_panic(ctx, HttpMethod::Get, url, "");
2093 assert_route_doesnt_panic(ctx, HttpMethod::Post, url, "{}");
2094 assert_route_doesnt_panic(ctx, HttpMethod::Delete, url, "");
2095 }
2096 });
2097 }
2098
2099 #[test]
2100 fn fuzz_deeply_nested_json_dont_stack_overflow() {
2101 let admin = AuthContext::admin();
2104 with_ctx(true, &admin, |ctx| {
2105 let depth = 300;
2106 let body = format!("{}{}", "[".repeat(depth), "]".repeat(depth),);
2107 let (status, _body, _ct) = route(ctx, HttpMethod::Post, "/api/sync/push", &body, None);
2108 assert!(status >= 200 && status < 600);
2111 });
2112 }
2113
2114 #[test]
2115 fn fuzz_unusual_http_methods_gracefully() {
2116 let admin = AuthContext::admin();
2117 with_ctx(true, &admin, |ctx| {
2118 for method in [
2119 HttpMethod::Get,
2120 HttpMethod::Post,
2121 HttpMethod::Put,
2122 HttpMethod::Patch,
2123 HttpMethod::Delete,
2124 HttpMethod::Options,
2125 HttpMethod::Head,
2126 ] {
2127 let (_status, _body, _ct) = route(ctx, method, "/api/entities/User", "{}", None);
2128 }
2129 });
2130 }
2131
2132 struct UserStubStore {
2142 manifest: AppManifest,
2143 last_update: std::sync::Mutex<Option<(String, String, serde_json::Value)>>,
2144 }
2145 impl pylon_http::DataStore for UserStubStore {
2146 fn manifest(&self) -> &AppManifest {
2147 &self.manifest
2148 }
2149 fn insert(
2150 &self,
2151 _e: &str,
2152 _d: &serde_json::Value,
2153 ) -> Result<String, pylon_http::DataError> {
2154 Ok("u-1".into())
2155 }
2156 fn get_by_id(
2157 &self,
2158 entity: &str,
2159 id: &str,
2160 ) -> Result<Option<serde_json::Value>, pylon_http::DataError> {
2161 if entity == "User" && id == "u-1" {
2162 return Ok(Some(serde_json::json!({
2163 "id": "u-1",
2164 "email": "alice@example.com",
2165 "displayName": "Alice",
2166 })));
2167 }
2168 Ok(None)
2169 }
2170 fn list(&self, _e: &str) -> Result<Vec<serde_json::Value>, pylon_http::DataError> {
2171 Ok(vec![])
2172 }
2173 fn list_after(
2174 &self,
2175 _e: &str,
2176 _a: Option<&str>,
2177 _l: usize,
2178 ) -> Result<Vec<serde_json::Value>, pylon_http::DataError> {
2179 Ok(vec![])
2180 }
2181 fn update(
2182 &self,
2183 entity: &str,
2184 id: &str,
2185 data: &serde_json::Value,
2186 ) -> Result<bool, pylon_http::DataError> {
2187 *self.last_update.lock().unwrap() = Some((entity.into(), id.into(), data.clone()));
2188 Ok(true)
2189 }
2190 fn delete(&self, _e: &str, _i: &str) -> Result<bool, pylon_http::DataError> {
2191 Ok(true)
2192 }
2193 fn lookup(
2194 &self,
2195 _e: &str,
2196 _f: &str,
2197 _v: &str,
2198 ) -> Result<Option<serde_json::Value>, pylon_http::DataError> {
2199 Ok(None)
2200 }
2201 fn link(
2202 &self,
2203 _e: &str,
2204 _i: &str,
2205 _r: &str,
2206 _t: &str,
2207 ) -> Result<bool, pylon_http::DataError> {
2208 Ok(true)
2209 }
2210 fn unlink(&self, _e: &str, _i: &str, _r: &str) -> Result<bool, pylon_http::DataError> {
2211 Ok(true)
2212 }
2213 fn query_filtered(
2214 &self,
2215 _e: &str,
2216 _f: &serde_json::Value,
2217 ) -> Result<Vec<serde_json::Value>, pylon_http::DataError> {
2218 Ok(vec![])
2219 }
2220 fn query_graph(
2221 &self,
2222 _q: &serde_json::Value,
2223 ) -> Result<serde_json::Value, pylon_http::DataError> {
2224 Ok(serde_json::json!({}))
2225 }
2226 fn aggregate(
2227 &self,
2228 _e: &str,
2229 _s: &serde_json::Value,
2230 ) -> Result<serde_json::Value, pylon_http::DataError> {
2231 Ok(serde_json::json!({}))
2232 }
2233 fn transact(
2234 &self,
2235 _o: &[serde_json::Value],
2236 ) -> Result<(bool, Vec<serde_json::Value>), pylon_http::DataError> {
2237 Ok((true, vec![]))
2238 }
2239 fn search(
2240 &self,
2241 _e: &str,
2242 _q: &serde_json::Value,
2243 ) -> Result<serde_json::Value, pylon_http::DataError> {
2244 Ok(serde_json::json!({}))
2245 }
2246 }
2247
2248 struct CaptureEmail {
2252 sent: std::sync::Mutex<Vec<(String, String, String)>>,
2253 }
2254 impl EmailSender for CaptureEmail {
2255 fn send(&self, to: &str, subject: &str, body: &str) -> Result<(), String> {
2256 self.sent
2257 .lock()
2258 .unwrap()
2259 .push((to.into(), subject.into(), body.into()));
2260 Ok(())
2261 }
2262 }
2263
2264 fn with_user_ctx<F>(is_dev: bool, auth: &AuthContext, f: F)
2265 where
2266 F: FnOnce(&RouterContext, &UserStubStore, &CaptureEmail, &MagicCodeStore),
2267 {
2268 let manifest = empty_manifest();
2269 let store = UserStubStore {
2270 manifest: manifest.clone(),
2271 last_update: std::sync::Mutex::new(None),
2272 };
2273 let session_store = SessionStore::new();
2274 let magic_codes = MagicCodeStore::new();
2275 let oauth_state = OAuthStateStore::new();
2276 let account_store = pylon_auth::AccountStore::new();
2277 let api_keys = pylon_auth::api_key::ApiKeyStore::new();
2278 let orgs = pylon_auth::org::OrgStore::new();
2279 let siwe = pylon_auth::siwe::NonceStore::new();
2280 let phone_codes = pylon_auth::phone::PhoneCodeStore::new();
2281 let passkeys = pylon_auth::webauthn::PasskeyStore::new();
2282 let policy_engine = PolicyEngine::from_manifest(&manifest);
2283 let change_log = ChangeLog::new();
2284 let notifier = NoopNotifier;
2285 let rooms = StubRooms;
2286 let cache = StubCache;
2287 let pubsub = StubPubSub;
2288 let jobs = StubJobs;
2289 let scheduler = StubScheduler;
2290 let workflows = StubWorkflows;
2291 let files = StubFiles;
2292 let openapi = StubOpenApi;
2293 let email = CaptureEmail {
2294 sent: std::sync::Mutex::new(vec![]),
2295 };
2296 let hooks = NoopPluginHooks;
2297 let cookie_config = CookieConfig::from_env(&CookieConfig::default_name_for("test"));
2298
2299 let ctx = RouterContext {
2300 store: &store,
2301 session_store: &session_store,
2302 magic_codes: &magic_codes,
2303 oauth_state: &oauth_state,
2304 account_store: &account_store,
2305 api_keys: &api_keys,
2306 orgs: &orgs,
2307 siwe: &siwe,
2308 phone_codes: &phone_codes,
2309 passkeys: &passkeys,
2310 trusted_origins: &[],
2311 policy_engine: &policy_engine,
2312 change_log: &change_log,
2313 notifier: ¬ifier,
2314 rooms: &rooms,
2315 cache: &cache,
2316 pubsub: &pubsub,
2317 jobs: &jobs,
2318 scheduler: &scheduler,
2319 workflows: &workflows,
2320 files: &files,
2321 openapi: &openapi,
2322 functions: None,
2323 email: &email,
2324 shards: None,
2325 plugin_hooks: &hooks,
2326 auth_ctx: auth,
2327 is_dev,
2328 request_headers: &[],
2329 peer_ip: "127.0.0.1",
2330 cookie_config: &cookie_config,
2331 response_headers: RefCell::new(Vec::new()),
2332 };
2333 f(&ctx, &store, &email, &magic_codes);
2334 }
2335
2336 #[test]
2337 fn email_send_verification_requires_auth() {
2338 let anon = AuthContext::anonymous();
2339 with_user_ctx(true, &anon, |ctx, _, _, _| {
2340 let (status, body, _) = route(
2341 ctx,
2342 HttpMethod::Post,
2343 "/api/auth/email/send-verification",
2344 "{}",
2345 None,
2346 );
2347 assert_eq!(status, 401);
2348 assert!(body.contains("UNAUTHORIZED"));
2349 });
2350 }
2351
2352 #[test]
2353 fn email_verify_requires_auth() {
2354 let anon = AuthContext::anonymous();
2355 with_user_ctx(true, &anon, |ctx, _, _, _| {
2356 let (status, body, _) = route(
2357 ctx,
2358 HttpMethod::Post,
2359 "/api/auth/email/verify",
2360 r#"{"code":"123456"}"#,
2361 None,
2362 );
2363 assert_eq!(status, 401);
2364 assert!(body.contains("UNAUTHORIZED"));
2365 });
2366 }
2367
2368 #[test]
2369 fn email_send_verification_uses_session_email_not_body() {
2370 let alice = AuthContext::authenticated("u-1".into());
2375 with_user_ctx(true, &alice, |ctx, _, email, _| {
2376 let (status, body, _) = route(
2377 ctx,
2378 HttpMethod::Post,
2379 "/api/auth/email/send-verification",
2380 r#"{"email":"victim@example.com"}"#,
2381 None,
2382 );
2383 assert_eq!(status, 200);
2384 let sent = email.sent.lock().unwrap();
2387 assert_eq!(sent.len(), 1);
2388 assert_eq!(sent[0].0, "alice@example.com");
2389 assert!(body.contains("alice@example.com"));
2390 assert!(!body.contains("victim@example.com"));
2391 });
2392 }
2393
2394 #[test]
2395 fn email_verify_happy_path_stamps_email_verified() {
2396 let alice = AuthContext::authenticated("u-1".into());
2397 with_user_ctx(true, &alice, |ctx, store, _, magic_codes| {
2398 let code = magic_codes.try_create("alice@example.com").unwrap();
2401 let body = format!(r#"{{"code":"{code}"}}"#);
2402 let (status, resp, _) =
2403 route(ctx, HttpMethod::Post, "/api/auth/email/verify", &body, None);
2404 assert_eq!(status, 200);
2405 assert!(resp.contains("\"verified\":true"));
2406 let last = store.last_update.lock().unwrap();
2408 let (entity, id, data) = last.as_ref().expect("update should have fired");
2409 assert_eq!(entity, "User");
2410 assert_eq!(id, "u-1");
2411 assert!(data.get("emailVerified").is_some());
2412 });
2413 }
2414
2415 #[test]
2416 fn email_verify_rejects_wrong_code() {
2417 let alice = AuthContext::authenticated("u-1".into());
2418 with_user_ctx(true, &alice, |ctx, store, _, magic_codes| {
2419 let _ = magic_codes.try_create("alice@example.com").unwrap();
2420 let (status, body, _) = route(
2421 ctx,
2422 HttpMethod::Post,
2423 "/api/auth/email/verify",
2424 r#"{"code":"999999"}"#,
2425 None,
2426 );
2427 assert_eq!(status, 401);
2428 assert!(body.contains("INVALID_CODE"));
2429 assert!(store.last_update.lock().unwrap().is_none());
2431 });
2432 }
2433
2434 #[test]
2436 fn auth_session_allowed_in_dev_mode() {
2437 let anon = AuthContext::anonymous();
2438 with_ctx(true, &anon, |ctx| {
2439 let (status, _body, _ct) = route(
2440 ctx,
2441 HttpMethod::Post,
2442 "/api/auth/session",
2443 r#"{"user_id":"alice"}"#,
2444 None,
2445 );
2446 assert_eq!(status, 201);
2447 });
2448 }
2449
2450 use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
2458
2459 struct CountingHooks {
2460 before_insert_calls: AtomicU32,
2461 after_insert_calls: AtomicU32,
2462 before_delete_calls: AtomicU32,
2463 reject_on_entity: Option<&'static str>,
2464 }
2465
2466 impl CountingHooks {
2467 fn new() -> Self {
2468 Self {
2469 before_insert_calls: AtomicU32::new(0),
2470 after_insert_calls: AtomicU32::new(0),
2471 before_delete_calls: AtomicU32::new(0),
2472 reject_on_entity: None,
2473 }
2474 }
2475 }
2476
2477 impl PluginHookOps for CountingHooks {
2478 fn before_insert(
2479 &self,
2480 entity: &str,
2481 _data: &mut serde_json::Value,
2482 _auth: &AuthContext,
2483 ) -> Result<(), (u16, String, String)> {
2484 self.before_insert_calls.fetch_add(1, Ordering::SeqCst);
2485 if self.reject_on_entity == Some(entity) {
2486 return Err((422, "VALIDATION".into(), "rejected by plugin".into()));
2487 }
2488 Ok(())
2489 }
2490 fn after_insert(
2491 &self,
2492 _entity: &str,
2493 _id: &str,
2494 _data: &serde_json::Value,
2495 _auth: &AuthContext,
2496 ) {
2497 self.after_insert_calls.fetch_add(1, Ordering::SeqCst);
2498 }
2499 fn before_update(
2500 &self,
2501 _entity: &str,
2502 _id: &str,
2503 _data: &mut serde_json::Value,
2504 _auth: &AuthContext,
2505 ) -> Result<(), (u16, String, String)> {
2506 Ok(())
2507 }
2508 fn after_update(
2509 &self,
2510 _entity: &str,
2511 _id: &str,
2512 _data: &serde_json::Value,
2513 _auth: &AuthContext,
2514 ) {
2515 }
2516 fn before_delete(
2517 &self,
2518 _entity: &str,
2519 _id: &str,
2520 _auth: &AuthContext,
2521 ) -> Result<(), (u16, String, String)> {
2522 self.before_delete_calls.fetch_add(1, Ordering::SeqCst);
2523 Ok(())
2524 }
2525 fn after_delete(&self, _entity: &str, _id: &str, _auth: &AuthContext) {}
2526 }
2527
2528 #[test]
2529 fn plugin_hooks_fire_on_entity_post() {
2530 let admin = AuthContext::admin();
2533 let hooks = CountingHooks::new();
2534 with_ctx_hooks(true, &admin, &hooks, |ctx| {
2535 let (status, _body, _ct) = route(
2536 ctx,
2537 HttpMethod::Post,
2538 "/api/entities/User",
2539 r#"{"email":"a@b"}"#,
2540 None,
2541 );
2542 assert_eq!(status, 201);
2543 });
2544 assert_eq!(hooks.before_insert_calls.load(Ordering::SeqCst), 1);
2545 assert_eq!(hooks.after_insert_calls.load(Ordering::SeqCst), 1);
2546 }
2547
2548 #[test]
2549 fn plugin_before_insert_rejection_short_circuits_write() {
2550 let admin = AuthContext::admin();
2553 let rejector = CountingHooks {
2554 reject_on_entity: Some("User"),
2555 ..CountingHooks::new()
2556 };
2557 with_ctx_hooks(true, &admin, &rejector, |ctx| {
2558 let (status, body, _ct) =
2559 route(ctx, HttpMethod::Post, "/api/entities/User", r#"{}"#, None);
2560 assert_eq!(status, 422);
2561 assert!(body.contains("VALIDATION"));
2562 });
2563 assert_eq!(rejector.before_insert_calls.load(Ordering::SeqCst), 1);
2564 assert_eq!(rejector.after_insert_calls.load(Ordering::SeqCst), 0);
2566 }
2567
2568 #[test]
2574 fn gdpr_export_requires_admin() {
2575 let anon = AuthContext::anonymous();
2576 with_ctx(false, &anon, |ctx| {
2577 let (status, body, _ct) = route(
2578 ctx,
2579 HttpMethod::Post,
2580 "/api/admin/users/alice/export",
2581 "",
2582 None,
2583 );
2584 assert_eq!(status, 403);
2585 assert!(body.contains("FORBIDDEN"));
2586 });
2587 }
2588
2589 #[test]
2590 fn gdpr_purge_requires_admin() {
2591 let anon = AuthContext::anonymous();
2592 with_ctx(false, &anon, |ctx| {
2593 let (status, _body, _ct) = route(
2594 ctx,
2595 HttpMethod::Delete,
2596 "/api/admin/users/alice/purge",
2597 "",
2598 None,
2599 );
2600 assert_eq!(status, 403);
2601 });
2602 }
2603
2604 #[test]
2605 fn gdpr_export_returns_envelope_for_admin() {
2606 let admin = AuthContext::admin();
2607 with_ctx(true, &admin, |ctx| {
2608 let (status, body, _ct) = route(
2609 ctx,
2610 HttpMethod::Post,
2611 "/api/admin/users/alice/export",
2612 "",
2613 None,
2614 );
2615 assert_eq!(status, 200);
2616 let v: serde_json::Value = serde_json::from_str(&body).unwrap();
2617 assert_eq!(v["user_id"], "alice");
2618 assert!(v["entities"].is_object());
2619 assert!(v["exported_at"].is_string());
2620 });
2621 }
2622
2623 #[test]
2624 fn plugin_hooks_fire_on_entity_delete() {
2625 let admin = AuthContext::admin();
2626 let hooks = CountingHooks::new();
2627 with_ctx_hooks(true, &admin, &hooks, |ctx| {
2628 let (status, _body, _ct) = route(
2629 ctx,
2630 HttpMethod::Delete,
2631 "/api/entities/User/stub-id",
2632 "",
2633 None,
2634 );
2635 assert_eq!(status, 200);
2636 });
2637 assert_eq!(hooks.before_delete_calls.load(Ordering::SeqCst), 1);
2638 }
2639
2640 #[test]
2650 fn webhook_returns_503_when_functions_unavailable() {
2651 let anon = AuthContext::anonymous();
2652 with_ctx(true, &anon, |ctx| {
2653 let (status, body, _ct) = route(
2654 ctx,
2655 HttpMethod::Post,
2656 "/api/webhooks/stripe_handler",
2657 "{}",
2658 None,
2659 );
2660 assert_eq!(status, 503);
2661 assert!(body.contains("FUNCTIONS_NOT_AVAILABLE"));
2662 });
2663 }
2664
2665 #[test]
2666 fn webhook_accepts_any_http_method() {
2667 let anon = AuthContext::anonymous();
2670 with_ctx(true, &anon, |ctx| {
2671 for method in [
2672 HttpMethod::Get,
2673 HttpMethod::Post,
2674 HttpMethod::Put,
2675 HttpMethod::Patch,
2676 HttpMethod::Delete,
2677 ] {
2678 let (status, _body, _ct) = route(ctx, method, "/api/webhooks/any_name", "", None);
2679 assert_ne!(status, 405);
2682 }
2683 });
2684 }
2685
2686 #[derive(Clone, Copy)]
2704 enum Expect {
2705 Eq(u16),
2707 Rejected,
2709 ReachedHandler,
2712 }
2713
2714 fn assert_expect(actual: u16, want: Expect, label: &str) {
2715 match want {
2716 Expect::Eq(s) => assert_eq!(actual, s, "{label}: expected status {s}, got {actual}"),
2717 Expect::Rejected => assert!(
2718 actual == 401 || actual == 403,
2719 "{label}: expected 401 or 403, got {actual}"
2720 ),
2721 Expect::ReachedHandler => assert_ne!(
2722 actual, 405,
2723 "{label}: route should accept this method, got 405"
2724 ),
2725 }
2726 }
2727
2728 fn matrix_check(
2733 method: HttpMethod,
2734 url: &str,
2735 body: &str,
2736 expect_anon: Expect,
2737 expect_guest: Expect,
2738 expect_user: Expect,
2739 ) {
2740 let anon = AuthContext::anonymous();
2741 let guest = AuthContext::guest("guest-1".into());
2742 let user = AuthContext::authenticated("u-1".into());
2743
2744 for (auth, want, who) in [
2745 (&anon, expect_anon, "anon"),
2746 (&guest, expect_guest, "guest"),
2747 (&user, expect_user, "user"),
2748 ] {
2749 with_ctx(false, auth, |ctx| {
2750 let (status, _body, _ct) = route(ctx, method, url, body, None);
2751 assert_expect(status, want, &format!("{who} {method:?} {url}"));
2752 });
2753 }
2754 }
2755
2756 fn matrix_check_with_deny_policy(
2762 deny_entity: &str,
2763 method: HttpMethod,
2764 url: &str,
2765 body: &str,
2766 expect_anon: Expect,
2767 expect_guest: Expect,
2768 expect_user: Expect,
2769 ) {
2770 use pylon_kernel::{AppManifest, ManifestPolicy, MANIFEST_VERSION};
2771 let anon = AuthContext::anonymous();
2772 let guest = AuthContext::guest("guest-1".into());
2773 let user = AuthContext::authenticated("u-1".into());
2774
2775 let manifest = AppManifest {
2776 manifest_version: MANIFEST_VERSION,
2777 name: "test".into(),
2778 version: "0.1.0".into(),
2779 entities: vec![],
2780 routes: vec![],
2781 queries: vec![],
2782 actions: vec![],
2783 policies: vec![ManifestPolicy {
2784 name: "denyAll".into(),
2785 entity: Some(deny_entity.into()),
2786 allow_read: Some("false".into()),
2787 allow_update: Some("false".into()),
2788 ..Default::default()
2789 }],
2790 auth: Default::default(),
2791 };
2792 let store = StubDataStore {
2793 manifest: manifest.clone(),
2794 };
2795 let session_store = SessionStore::new();
2796 let magic_codes = MagicCodeStore::new();
2797 let oauth_state = OAuthStateStore::new();
2798 let account_store = pylon_auth::AccountStore::new();
2799 let api_keys = pylon_auth::api_key::ApiKeyStore::new();
2800 let orgs = pylon_auth::org::OrgStore::new();
2801 let siwe = pylon_auth::siwe::NonceStore::new();
2802 let phone_codes = pylon_auth::phone::PhoneCodeStore::new();
2803 let passkeys = pylon_auth::webauthn::PasskeyStore::new();
2804 let policy_engine = PolicyEngine::from_manifest(&manifest);
2805 let change_log = ChangeLog::new();
2806 let notifier = NoopNotifier;
2807 let rooms = StubRooms;
2808 let cache = StubCache;
2809 let pubsub = StubPubSub;
2810 let jobs = StubJobs;
2811 let scheduler = StubScheduler;
2812 let workflows = StubWorkflows;
2813 let files = StubFiles;
2814 let openapi = StubOpenApi;
2815 let email = StubEmail;
2816 let cookie_config = CookieConfig::from_env(&CookieConfig::default_name_for("test"));
2817
2818 for (auth, want, who) in [
2819 (&anon, expect_anon, "anon"),
2820 (&guest, expect_guest, "guest"),
2821 (&user, expect_user, "user"),
2822 ] {
2823 let ctx = RouterContext {
2824 store: &store,
2825 session_store: &session_store,
2826 magic_codes: &magic_codes,
2827 oauth_state: &oauth_state,
2828 account_store: &account_store,
2829 api_keys: &api_keys,
2830 orgs: &orgs,
2831 siwe: &siwe,
2832 phone_codes: &phone_codes,
2833 passkeys: &passkeys,
2834 trusted_origins: &[],
2835 policy_engine: &policy_engine,
2836 change_log: &change_log,
2837 notifier: ¬ifier,
2838 rooms: &rooms,
2839 cache: &cache,
2840 pubsub: &pubsub,
2841 jobs: &jobs,
2842 scheduler: &scheduler,
2843 workflows: &workflows,
2844 files: &files,
2845 openapi: &openapi,
2846 functions: None,
2847 email: &email,
2848 shards: None,
2849 plugin_hooks: &NoopPluginHooks,
2850 auth_ctx: auth,
2851 is_dev: false,
2852 request_headers: &[],
2853 peer_ip: "127.0.0.1",
2854 cookie_config: &cookie_config,
2855 response_headers: RefCell::new(Vec::new()),
2856 };
2857 let (status, _body, _ct) = route(&ctx, method, url, body, None);
2858 assert_expect(status, want, &format!("{who} {method:?} {url}"));
2859 }
2860 }
2861
2862 #[test]
2863 fn matrix_cache_admin_only() {
2864 matrix_check(
2865 HttpMethod::Get,
2866 "/api/cache/anykey",
2867 "",
2868 Expect::Rejected,
2869 Expect::Rejected,
2870 Expect::Rejected,
2871 );
2872 matrix_check(
2873 HttpMethod::Post,
2874 "/api/cache",
2875 r#"{"op":"get","key":"x"}"#,
2876 Expect::Rejected,
2877 Expect::Rejected,
2878 Expect::Rejected,
2879 );
2880 matrix_check(
2881 HttpMethod::Delete,
2882 "/api/cache/anykey",
2883 "",
2884 Expect::Rejected,
2885 Expect::Rejected,
2886 Expect::Rejected,
2887 );
2888 }
2889
2890 #[test]
2891 fn matrix_pubsub_admin_only() {
2892 matrix_check(
2893 HttpMethod::Post,
2894 "/api/pubsub/publish",
2895 r#"{"channel":"x","message":"y"}"#,
2896 Expect::Rejected,
2897 Expect::Rejected,
2898 Expect::Rejected,
2899 );
2900 matrix_check(
2901 HttpMethod::Get,
2902 "/api/pubsub/channels",
2903 "",
2904 Expect::Rejected,
2905 Expect::Rejected,
2906 Expect::Rejected,
2907 );
2908 matrix_check(
2909 HttpMethod::Get,
2910 "/api/pubsub/history/some-channel",
2911 "",
2912 Expect::Rejected,
2913 Expect::Rejected,
2914 Expect::Rejected,
2915 );
2916 }
2917
2918 #[test]
2919 fn matrix_jobs_read_admin_only() {
2920 matrix_check(
2921 HttpMethod::Get,
2922 "/api/jobs/stats",
2923 "",
2924 Expect::Rejected,
2925 Expect::Rejected,
2926 Expect::Rejected,
2927 );
2928 matrix_check(
2929 HttpMethod::Get,
2930 "/api/jobs/dead",
2931 "",
2932 Expect::Rejected,
2933 Expect::Rejected,
2934 Expect::Rejected,
2935 );
2936 matrix_check(
2937 HttpMethod::Get,
2938 "/api/jobs",
2939 "",
2940 Expect::Rejected,
2941 Expect::Rejected,
2942 Expect::Rejected,
2943 );
2944 matrix_check(
2945 HttpMethod::Get,
2946 "/api/jobs/some-job-id",
2947 "",
2948 Expect::Rejected,
2949 Expect::Rejected,
2950 Expect::Rejected,
2951 );
2952 }
2953
2954 #[test]
2955 fn matrix_workflows_read_admin_only() {
2956 matrix_check(
2957 HttpMethod::Get,
2958 "/api/workflows/definitions",
2959 "",
2960 Expect::Rejected,
2961 Expect::Rejected,
2962 Expect::Rejected,
2963 );
2964 matrix_check(
2965 HttpMethod::Get,
2966 "/api/workflows",
2967 "",
2968 Expect::Rejected,
2969 Expect::Rejected,
2970 Expect::Rejected,
2971 );
2972 matrix_check(
2973 HttpMethod::Get,
2974 "/api/workflows/some-id",
2975 "",
2976 Expect::Rejected,
2977 Expect::Rejected,
2978 Expect::Rejected,
2979 );
2980 }
2981
2982 #[test]
2983 fn matrix_files_download_requires_auth() {
2984 matrix_check(
2987 HttpMethod::Get,
2988 "/api/files/some-file-id",
2989 "",
2990 Expect::Eq(401),
2991 Expect::ReachedHandler,
2992 Expect::ReachedHandler,
2993 );
2994 }
2995
2996 #[test]
2997 fn matrix_crdt_push_respects_update_policy() {
2998 matrix_check_with_deny_policy(
3003 "Doc",
3004 HttpMethod::Post,
3005 "/api/crdt/Doc/some-row",
3006 r#"{"update":"00"}"#,
3007 Expect::Eq(401),
3008 Expect::Rejected,
3009 Expect::Rejected,
3010 );
3011 }
3012
3013 #[test]
3014 fn matrix_filtered_query_respects_read_policy() {
3015 matrix_check_with_deny_policy(
3016 "Secret",
3017 HttpMethod::Post,
3018 "/api/query/Secret",
3019 r#"{"where":{}}"#,
3020 Expect::Rejected,
3021 Expect::Rejected,
3022 Expect::Rejected,
3023 );
3024 }
3025
3026 #[test]
3027 fn matrix_cursor_pagination_respects_read_policy() {
3028 matrix_check_with_deny_policy(
3029 "Secret",
3030 HttpMethod::Get,
3031 "/api/entities/Secret/cursor?limit=10",
3032 "",
3033 Expect::Rejected,
3034 Expect::Rejected,
3035 Expect::Rejected,
3036 );
3037 }
3038
3039 #[test]
3052 fn matrix_admin_get_routes_audit() {
3053 let admin_get_routes: &[(&str, &str)] = &[
3054 ("/api/scheduler", "list scheduled tasks"),
3055 ("/api/fn", "enumerate registered functions"),
3056 ("/api/fn/traces", "function execution traces"),
3057 ("/api/shards", "shard topology + subscriber counts"),
3058 ("/api/cache/anykey", "raw cache read"),
3059 ("/api/pubsub/channels", "list pub/sub channels"),
3060 ("/api/pubsub/history/anychannel", "channel retained history"),
3061 ("/api/jobs/stats", "job queue stats"),
3062 ("/api/jobs/dead", "dead-letter queue"),
3063 ("/api/jobs", "job list with payloads"),
3064 ("/api/jobs/some-id", "single job detail"),
3065 ("/api/workflows/definitions", "workflow definitions"),
3066 ("/api/workflows", "workflow instance list"),
3067 ("/api/workflows/some-id", "workflow instance detail"),
3068 ];
3069 for (url, label) in admin_get_routes {
3070 matrix_check(
3071 HttpMethod::Get,
3072 url,
3073 "",
3074 Expect::Rejected,
3075 Expect::Rejected,
3076 Expect::Rejected,
3077 );
3078 let _ = label;
3081 }
3082 }
3083
3084 #[allow(dead_code)]
3086 const _TOUCH_ATOMIC_BOOL: AtomicBool = AtomicBool::new(false);
3087
3088 #[test]
3093 fn redact_email_keeps_two_chars_and_domain() {
3094 assert_eq!(super::redact_email("alice@acme.com"), "al***@acme.com");
3095 assert_eq!(super::redact_email("a@b.io"), "a***@b.io");
3096 assert_eq!(super::redact_email("ab@x.io"), "ab***@x.io");
3097 assert_eq!(super::redact_email("not-an-email"), "***");
3099 assert_eq!(super::redact_email(""), "***");
3100 assert_eq!(super::redact_email("éric@x.io"), "ér***@x.io");
3102 }
3103
3104 #[test]
3105 fn public_manifest_strips_policy_expressions() {
3106 use pylon_kernel::{AppManifest, ManifestPolicy, MANIFEST_VERSION};
3107 let m = AppManifest {
3108 manifest_version: MANIFEST_VERSION,
3109 name: "t".into(),
3110 version: "0.0.0".into(),
3111 entities: vec![],
3112 routes: vec![],
3113 queries: vec![],
3114 actions: vec![],
3115 policies: vec![ManifestPolicy {
3116 name: "ownerOnly".into(),
3117 entity: Some("Todo".into()),
3118 allow_read: Some("auth.userId == data.ownerId".into()),
3119 allow_update: Some("auth.userId == data.ownerId".into()),
3120 ..Default::default()
3121 }],
3122 auth: Default::default(),
3123 };
3124 let pub_m = super::public_manifest(&m);
3125 let p = &pub_m.policies[0];
3126 assert_eq!(p.name, "ownerOnly");
3129 assert_eq!(p.entity.as_deref(), Some("Todo"));
3130 assert_eq!(p.allow, "");
3132 assert!(p.allow_read.is_none());
3133 assert!(p.allow_update.is_none());
3134 assert_eq!(
3137 m.policies[0].allow_read.as_deref(),
3138 Some("auth.userId == data.ownerId")
3139 );
3140 }
3141}