1use pylon_auth::{AuthContext, CookieConfig, MagicCodeStore, OAuthStateStore, SessionStore};
9use pylon_http::{DataError, DataStore, HttpMethod};
10use pylon_policy::PolicyEngine;
11use pylon_sync::{ChangeKind, ChangeLog};
12use std::cell::RefCell;
13
14mod routes;
15
16pub trait ChangeNotifier: Send + Sync {
25 fn notify(&self, event: &pylon_sync::ChangeEvent);
26 fn notify_presence(&self, json: &str);
27
28 fn notify_crdt(&self, _entity: &str, _row_id: &str, _snapshot: &[u8]) {}
37}
38
39pub struct NoopNotifier;
41
42impl ChangeNotifier for NoopNotifier {
43 fn notify(&self, _event: &pylon_sync::ChangeEvent) {}
44 fn notify_presence(&self, _json: &str) {}
45}
46
47pub const CRDT_FRAME_SNAPSHOT: u8 = 0x10;
67pub const CRDT_FRAME_UPDATE: u8 = 0x11;
69
70#[derive(Debug, Clone, PartialEq, Eq)]
75pub enum FrameEncodeError {
76 EntityTooLong { len: usize },
81 RowIdTooLong { len: usize },
85}
86
87impl std::fmt::Display for FrameEncodeError {
88 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89 match self {
90 Self::EntityTooLong { len } => write!(
91 f,
92 "CRDT frame: entity name {len} bytes exceeds u16 length limit ({})",
93 u16::MAX
94 ),
95 Self::RowIdTooLong { len } => write!(
96 f,
97 "CRDT frame: row_id {len} bytes exceeds u16 length limit ({})",
98 u16::MAX
99 ),
100 }
101 }
102}
103
104impl std::error::Error for FrameEncodeError {}
105
106pub fn encode_crdt_frame(
113 frame_type: u8,
114 entity: &str,
115 row_id: &str,
116 payload: &[u8],
117) -> Result<Vec<u8>, FrameEncodeError> {
118 let entity_bytes = entity.as_bytes();
119 let row_id_bytes = row_id.as_bytes();
120 if entity_bytes.len() > u16::MAX as usize {
121 return Err(FrameEncodeError::EntityTooLong {
122 len: entity_bytes.len(),
123 });
124 }
125 if row_id_bytes.len() > u16::MAX as usize {
126 return Err(FrameEncodeError::RowIdTooLong {
127 len: row_id_bytes.len(),
128 });
129 }
130 let entity_len = entity_bytes.len() as u16;
131 let row_id_len = row_id_bytes.len() as u16;
132 let mut out =
133 Vec::with_capacity(1 + 2 + entity_bytes.len() + 2 + row_id_bytes.len() + payload.len());
134 out.push(frame_type);
135 out.extend_from_slice(&entity_len.to_be_bytes());
136 out.extend_from_slice(entity_bytes);
137 out.extend_from_slice(&row_id_len.to_be_bytes());
138 out.extend_from_slice(row_id_bytes);
139 out.extend_from_slice(payload);
140 Ok(out)
141}
142
143#[cfg(test)]
144mod crdt_frame_tests {
145 use super::*;
146
147 #[test]
148 fn roundtrip_header_layout() {
149 let frame = encode_crdt_frame(
150 CRDT_FRAME_SNAPSHOT,
151 "Message",
152 "msg_123",
153 &[0xab, 0xcd, 0xef],
154 )
155 .unwrap();
156 assert_eq!(frame[0], 0x10);
157 assert_eq!(&frame[1..3], &[0, 7]);
159 assert_eq!(&frame[3..10], b"Message");
160 assert_eq!(&frame[10..12], &[0, 7]);
162 assert_eq!(&frame[12..19], b"msg_123");
163 assert_eq!(&frame[19..], &[0xab, 0xcd, 0xef]);
165 }
166
167 #[test]
168 fn empty_payload_still_carries_headers() {
169 let frame = encode_crdt_frame(CRDT_FRAME_UPDATE, "X", "y", &[]).unwrap();
170 assert_eq!(frame[0], 0x11);
171 assert_eq!(&frame[1..3], &[0, 1]);
172 assert_eq!(&frame[3..4], b"X");
173 assert_eq!(&frame[4..6], &[0, 1]);
174 assert_eq!(&frame[6..7], b"y");
175 assert_eq!(frame.len(), 7);
176 }
177
178 #[test]
179 fn hex_roundtrip() {
180 assert_eq!(super::decode_hex(""), Some(vec![]));
181 assert_eq!(super::decode_hex("00"), Some(vec![0x00]));
182 assert_eq!(super::decode_hex("ab"), Some(vec![0xab]));
183 assert_eq!(super::decode_hex("AB"), Some(vec![0xab]));
184 assert_eq!(
185 super::decode_hex("DEADBEEF"),
186 Some(vec![0xde, 0xad, 0xbe, 0xef])
187 );
188 }
189
190 #[test]
191 fn hex_rejects_malformed() {
192 assert_eq!(super::decode_hex("a"), None); assert_eq!(super::decode_hex("xy"), None); assert_eq!(super::decode_hex("ab cd"), None); }
196
197 #[test]
198 fn entity_too_long_errors() {
199 let huge_entity = "x".repeat(u16::MAX as usize + 1);
200 let err = encode_crdt_frame(CRDT_FRAME_SNAPSHOT, &huge_entity, "y", &[])
201 .expect_err("entity > u16::MAX must reject");
202 assert!(matches!(err, FrameEncodeError::EntityTooLong { .. }));
203 }
204
205 #[test]
206 fn row_id_too_long_errors() {
207 let huge_row_id = "x".repeat(u16::MAX as usize + 1);
208 let err = encode_crdt_frame(CRDT_FRAME_SNAPSHOT, "X", &huge_row_id, &[])
209 .expect_err("row_id > u16::MAX must reject");
210 assert!(matches!(err, FrameEncodeError::RowIdTooLong { .. }));
211 }
212}
213
214pub trait CacheOps: Send + Sync {
221 fn handle_command(&self, body: &str) -> (u16, String);
222 fn handle_get(&self, key: &str) -> (u16, String);
223 fn handle_delete(&self, key: &str) -> (u16, String);
224}
225
226pub trait PubSubOps: Send + Sync {
228 fn handle_publish(&self, body: &str) -> (u16, String);
229 fn handle_channels(&self) -> (u16, String);
230 fn handle_history(&self, channel: &str, url: &str) -> (u16, String);
231}
232
233pub trait RoomOps: Send + Sync {
235 fn join(
236 &self,
237 room: &str,
238 user_id: &str,
239 data: Option<serde_json::Value>,
240 ) -> Result<(serde_json::Value, serde_json::Value), DataError>;
241 fn leave(&self, room: &str, user_id: &str) -> Option<serde_json::Value>;
242 fn set_presence(
243 &self,
244 room: &str,
245 user_id: &str,
246 data: serde_json::Value,
247 ) -> Option<serde_json::Value>;
248 fn broadcast(
249 &self,
250 room: &str,
251 sender: Option<&str>,
252 topic: &str,
253 data: serde_json::Value,
254 ) -> Option<serde_json::Value>;
255 fn list_rooms(&self) -> Vec<String>;
256 fn room_size(&self, name: &str) -> usize;
257 fn members(&self, name: &str) -> Vec<serde_json::Value>;
258}
259
260pub trait JobOps: Send + Sync {
262 fn enqueue(
263 &self,
264 name: &str,
265 payload: serde_json::Value,
266 priority: &str,
267 delay_secs: u64,
268 max_retries: u32,
269 queue: &str,
270 ) -> String;
271 fn stats(&self) -> serde_json::Value;
272 fn dead_letters(&self) -> serde_json::Value;
273 fn retry_dead(&self, id: &str) -> bool;
274 fn list_jobs(
275 &self,
276 status: Option<&str>,
277 queue: Option<&str>,
278 limit: usize,
279 ) -> serde_json::Value;
280 fn get_job(&self, id: &str) -> Option<serde_json::Value>;
281}
282
283pub trait SchedulerOps: Send + Sync {
285 fn list_tasks(&self) -> serde_json::Value;
286 fn trigger(&self, name: &str) -> bool;
287}
288
289pub trait WorkflowOps: Send + Sync {
291 fn definitions(&self) -> serde_json::Value;
292 fn start(&self, name: &str, input: serde_json::Value) -> Result<String, String>;
293 fn list(&self, status_filter: Option<&str>) -> serde_json::Value;
294 fn get(&self, id: &str) -> Option<serde_json::Value>;
295 fn advance(&self, id: &str) -> Result<String, String>;
296 fn send_event(&self, id: &str, event: &str, data: serde_json::Value) -> Result<(), String>;
297 fn cancel(&self, id: &str) -> Result<(), String>;
298}
299
300pub trait FileOps: Send + Sync {
302 fn upload(&self, body: &str) -> (u16, String);
303 fn get_file(&self, id: &str) -> (u16, String);
304}
305
306pub trait EmailSender: Send + Sync {
308 fn send(&self, to: &str, subject: &str, body: &str) -> Result<(), String>;
309}
310
311pub trait ShardOps: Send + Sync {
313 fn get_shard(&self, id: &str) -> Option<std::sync::Arc<dyn pylon_realtime::DynShard>>;
315
316 fn list_shards(&self) -> Vec<String>;
318
319 fn shard_count(&self) -> usize;
321}
322
323pub trait OpenApiGenerator: Send + Sync {
325 fn generate(&self, base_url: &str) -> String;
326}
327
328pub trait PluginHookOps: Send + Sync {
341 fn before_insert(
342 &self,
343 entity: &str,
344 data: &mut serde_json::Value,
345 auth: &AuthContext,
346 ) -> Result<(), (u16, String, String)>;
347
348 fn after_insert(&self, entity: &str, id: &str, data: &serde_json::Value, auth: &AuthContext);
349
350 fn before_update(
351 &self,
352 entity: &str,
353 id: &str,
354 data: &mut serde_json::Value,
355 auth: &AuthContext,
356 ) -> Result<(), (u16, String, String)>;
357
358 fn after_update(&self, entity: &str, id: &str, data: &serde_json::Value, auth: &AuthContext);
359
360 fn before_delete(
361 &self,
362 entity: &str,
363 id: &str,
364 auth: &AuthContext,
365 ) -> Result<(), (u16, String, String)>;
366
367 fn after_delete(&self, entity: &str, id: &str, auth: &AuthContext);
368}
369
370pub struct NoopPluginHooks;
372
373impl PluginHookOps for NoopPluginHooks {
374 fn before_insert(
375 &self,
376 _entity: &str,
377 _data: &mut serde_json::Value,
378 _auth: &AuthContext,
379 ) -> Result<(), (u16, String, String)> {
380 Ok(())
381 }
382 fn after_insert(
383 &self,
384 _entity: &str,
385 _id: &str,
386 _data: &serde_json::Value,
387 _auth: &AuthContext,
388 ) {
389 }
390 fn before_update(
391 &self,
392 _entity: &str,
393 _id: &str,
394 _data: &mut serde_json::Value,
395 _auth: &AuthContext,
396 ) -> Result<(), (u16, String, String)> {
397 Ok(())
398 }
399 fn after_update(
400 &self,
401 _entity: &str,
402 _id: &str,
403 _data: &serde_json::Value,
404 _auth: &AuthContext,
405 ) {
406 }
407 fn before_delete(
408 &self,
409 _entity: &str,
410 _id: &str,
411 _auth: &AuthContext,
412 ) -> Result<(), (u16, String, String)> {
413 Ok(())
414 }
415 fn after_delete(&self, _entity: &str, _id: &str, _auth: &AuthContext) {}
416}
417
418pub trait FnOps: Send + Sync {
424 fn get_fn(&self, name: &str) -> Option<pylon_functions::registry::FnDef>;
426
427 fn list_fns(&self) -> Vec<pylon_functions::registry::FnDef>;
429
430 fn call(
439 &self,
440 fn_name: &str,
441 args: serde_json::Value,
442 auth: pylon_functions::protocol::AuthInfo,
443 on_stream: Option<Box<dyn FnMut(&str) + Send>>,
444 request: Option<pylon_functions::protocol::RequestInfo>,
445 ) -> Result<
446 (serde_json::Value, pylon_functions::trace::FnTrace),
447 pylon_functions::runner::FnCallError,
448 >;
449
450 fn recent_traces(&self, limit: usize) -> Vec<pylon_functions::trace::FnTrace>;
452
453 fn check_rate_limit(&self, _fn_name: &str, _identity: &str) -> Result<(), u64> {
461 Ok(())
462 }
463}
464
465pub struct RouterContext<'a> {
470 pub store: &'a dyn DataStore,
471 pub session_store: &'a SessionStore,
472 pub magic_codes: &'a MagicCodeStore,
473 pub oauth_state: &'a OAuthStateStore,
474 pub account_store: &'a pylon_auth::AccountStore,
479 pub api_keys: &'a pylon_auth::api_key::ApiKeyStore,
483 pub orgs: &'a pylon_auth::org::OrgStore,
486 pub siwe: &'a pylon_auth::siwe::NonceStore,
489 pub phone_codes: &'a pylon_auth::phone::PhoneCodeStore,
491 pub passkeys: &'a pylon_auth::webauthn::PasskeyStore,
494 pub verification: &'a pylon_auth::verification::VerificationStore,
498 pub audit: &'a pylon_auth::audit::AuditStore,
502 pub policy_engine: &'a PolicyEngine,
503 pub change_log: &'a ChangeLog,
504 pub notifier: &'a dyn ChangeNotifier,
505 pub rooms: &'a dyn RoomOps,
506 pub cache: &'a dyn CacheOps,
507 pub pubsub: &'a dyn PubSubOps,
508 pub jobs: &'a dyn JobOps,
509 pub scheduler: &'a dyn SchedulerOps,
510 pub workflows: &'a dyn WorkflowOps,
511 pub files: &'a dyn FileOps,
512 pub openapi: &'a dyn OpenApiGenerator,
513 pub functions: Option<&'a dyn FnOps>,
514 pub email: &'a dyn EmailSender,
515 pub shards: Option<&'a dyn ShardOps>,
516 pub plugin_hooks: &'a dyn PluginHookOps,
517 pub auth_ctx: &'a AuthContext,
518 pub trusted_origins: &'a [String],
527 pub is_dev: bool,
528 pub request_headers: &'a [(String, String)],
533 pub peer_ip: &'a str,
540 pub cookie_config: &'a CookieConfig,
544 pub response_headers: RefCell<Vec<(String, String)>>,
549}
550
551impl<'a> RouterContext<'a> {
552 pub fn add_response_header(&self, name: impl Into<String>, value: impl Into<String>) {
554 self.response_headers
555 .borrow_mut()
556 .push((name.into(), value.into()));
557 }
558
559 pub fn take_response_headers(&self) -> Vec<(String, String)> {
562 std::mem::take(&mut *self.response_headers.borrow_mut())
563 }
564
565 pub fn request_origin(&self) -> Option<&str> {
569 self.request_headers
570 .iter()
571 .find(|(k, _)| k.eq_ignore_ascii_case("origin"))
572 .map(|(_, v)| v.as_str())
573 }
574
575 pub fn maybe_set_session_cookie(&self, token: &str) {
581 if self.request_origin().is_some() {
582 self.add_response_header("Set-Cookie", self.cookie_config.set_value(token));
583 }
584 }
585}
586
587pub(crate) struct OAuthError {
592 pub(crate) status: u16,
593 pub(crate) code: &'static str,
594 pub(crate) message: String,
595}
596
597fn truncate_for_redirect(s: &str) -> String {
606 const MAX: usize = 240;
607 if s.len() <= MAX {
608 return s.to_string();
609 }
610 let budget = MAX - "…".len();
611 let mut end = budget;
612 while end > 0 && !s.is_char_boundary(end) {
613 end -= 1;
614 }
615 format!("{}…", &s[..end])
616}
617
618pub(crate) fn complete_oauth_login_pkce(
629 ctx: &RouterContext,
630 provider: &str,
631 code: Option<&str>,
632 pkce_verifier: Option<&str>,
633 dev_email: Option<&str>,
634 dev_name: Option<&str>,
635) -> Result<(String, pylon_auth::Session), OAuthError> {
636 let (userinfo, tokens) = if let Some(code) = code {
637 let registry = pylon_auth::OAuthRegistry::shared();
638 let config = registry.get(provider).cloned().ok_or_else(|| OAuthError {
639 status: 404,
640 code: "PROVIDER_NOT_FOUND",
641 message: format!("OAuth provider \"{provider}\" not configured"),
642 })?;
643 let tokens = config
644 .exchange_code_full_pkce(code, pkce_verifier)
645 .map_err(|err| OAuthError {
646 status: 502,
647 code: "OAUTH_TOKEN_EXCHANGE_FAILED",
648 message: truncate_for_redirect(&format!("token exchange failed: {err}")),
653 })?;
654 let info = config
657 .fetch_userinfo_with_id_token(&tokens.access_token, tokens.id_token.as_deref())
658 .map_err(|err| OAuthError {
659 status: 502,
660 code: "OAUTH_TOKEN_EXCHANGE_FAILED",
661 message: truncate_for_redirect(&format!("userinfo fetch failed: {err}")),
662 })?;
663 (info, tokens)
664 } else if ctx.is_dev {
665 let email = dev_email.ok_or_else(|| OAuthError {
666 status: 400,
667 code: "MISSING_FIELD",
668 message: "OAuth callback requires `code` (or `email` in dev mode)".into(),
669 })?;
670 let info = pylon_auth::UserInfo {
675 provider: provider.to_string(),
676 provider_account_id: format!("dev:{email}"),
677 email: email.to_string(),
678 name: dev_name.map(String::from),
679 };
680 let tokens = pylon_auth::TokenSet {
681 access_token: "dev_access_token".into(),
682 refresh_token: None,
683 id_token: None,
684 expires_at: None,
685 scope: None,
686 };
687 (info, tokens)
688 } else {
689 return Err(OAuthError {
690 status: 400,
691 code: "MISSING_FIELD",
692 message: "OAuth callback requires an authorization `code` from the provider".into(),
693 });
694 };
695
696 let now = chrono_now_iso();
703
704 let user_id = if let Some(existing) = ctx
718 .account_store
719 .find_by_provider(provider, &userinfo.provider_account_id)
720 {
721 let mut refreshed = pylon_auth::Account::new(existing.user_id.clone(), &userinfo, &tokens);
724 refreshed.created_at = existing.created_at;
725 ctx.account_store.upsert(&refreshed);
726 existing.user_id
727 } else if let Ok(Some(row)) = ctx.store.lookup(
728 &ctx.store.manifest().auth.user.entity,
729 "email",
730 &userinfo.email,
731 ) {
732 let id = row["id"].as_str().unwrap_or("").to_string();
736 if id.is_empty() {
737 return Err(OAuthError {
738 status: 500,
739 code: "USER_LOOKUP_INVALID",
740 message: "User row matched by email but had no id field".into(),
741 });
742 }
743 if row.get("emailVerified").map_or(true, |v| v.is_null()) {
744 let _ = ctx.store.update(
748 &ctx.store.manifest().auth.user.entity,
749 &id,
750 &serde_json::json!({ "emailVerified": now }),
751 );
752 }
753 ctx.account_store
754 .upsert(&pylon_auth::Account::new(id.clone(), &userinfo, &tokens));
755 id
756 } else {
757 let display_name = userinfo.name.as_deref().unwrap_or(&userinfo.email);
761 let user_entity = ctx.store.manifest().auth.user.entity.clone();
762 let id = ctx
763 .store
764 .insert(
765 &user_entity,
766 &serde_json::json!({
767 "email": userinfo.email,
768 "displayName": display_name,
769 "emailVerified": now,
770 "createdAt": now,
771 }),
772 )
773 .map_err(|e| OAuthError {
774 status: 500,
775 code: "USER_CREATE_FAILED",
776 message: format!(
782 "failed to create User row for OAuth signup ({}): {}",
783 e.code, e.message
784 ),
785 })?;
786 ctx.account_store
787 .upsert(&pylon_auth::Account::new(id.clone(), &userinfo, &tokens));
788 id
789 };
790 let session = ctx.session_store.create(user_id.clone());
791 Ok((user_id, session))
792}
793
794pub(crate) fn parse_query(q: &str) -> std::collections::HashMap<String, String> {
800 let mut out = std::collections::HashMap::new();
801 for pair in q.split('&') {
802 if pair.is_empty() {
803 continue;
804 }
805 let (k, v) = pair.split_once('=').unwrap_or((pair, ""));
806 out.insert(query_decode(k), query_decode(v));
807 }
808 out
809}
810
811fn query_decode(s: &str) -> String {
817 percent_decode(s, false)
818}
819
820#[allow(dead_code)]
821fn percent_decode(s: &str, plus_is_space: bool) -> String {
822 let bytes = s.as_bytes();
823 let mut out = Vec::with_capacity(bytes.len());
824 let mut i = 0;
825 while i < bytes.len() {
826 match bytes[i] {
827 b'+' if plus_is_space => {
828 out.push(b' ');
829 i += 1;
830 }
831 b'%' if i + 2 < bytes.len() => {
832 let hex = std::str::from_utf8(&bytes[i + 1..i + 3]).unwrap_or("");
833 match u8::from_str_radix(hex, 16) {
834 Ok(b) => {
835 out.push(b);
836 i += 3;
837 }
838 Err(_) => {
839 out.push(bytes[i]);
840 i += 1;
841 }
842 }
843 }
844 b => {
845 out.push(b);
846 i += 1;
847 }
848 }
849 }
850 String::from_utf8_lossy(&out).into_owned()
851}
852
853pub(crate) fn redact_email(email: &str) -> String {
858 match email.find('@') {
859 Some(at) => {
860 let (user, domain) = email.split_at(at);
861 let prefix_len = user.len().min(2);
862 let prefix: String = user.chars().take(prefix_len).collect();
863 format!("{prefix}***{domain}")
864 }
865 None => "***".to_string(),
866 }
867}
868
869fn public_manifest(m: &pylon_kernel::AppManifest) -> pylon_kernel::AppManifest {
875 let mut out = m.clone();
876 for p in out.policies.iter_mut() {
877 p.allow = String::new();
878 p.allow_read = None;
879 p.allow_insert = None;
880 p.allow_update = None;
881 p.allow_delete = None;
882 }
883 out
884}
885
886pub(crate) fn url_encode(s: &str) -> String {
887 let mut out = String::with_capacity(s.len());
888 for b in s.bytes() {
889 match b {
890 b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => {
891 out.push(b as char)
892 }
893 _ => out.push_str(&format!("%{b:02X}")),
894 }
895 }
896 out
897}
898
899pub fn route(
907 ctx: &RouterContext,
908 method: HttpMethod,
909 url: &str,
910 body: &str,
911 auth_token: Option<&str>,
912) -> (u16, String, &'static str) {
913 let (status, body) = route_inner(ctx, method, url, body, auth_token);
914 (status, body, "application/json")
915}
916
917fn route_inner(
918 ctx: &RouterContext,
919 method: HttpMethod,
920 url: &str,
921 body: &str,
922 auth_token: Option<&str>,
923) -> (u16, String) {
924 if method == HttpMethod::Options {
926 return (204, String::new());
927 }
928
929 if url.starts_with("/api/manifest") && method == HttpMethod::Get {
938 let path = url.split('?').next().unwrap_or(url);
939 if path == "/api/manifest" {
940 let want_full = query_param(url, "full")
941 .map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
942 .unwrap_or(false);
943 let manifest = ctx.store.manifest();
944 let body = if want_full && ctx.auth_ctx.is_admin {
945 serde_json::to_string(manifest).unwrap_or_else(|_| "{}".into())
946 } else {
947 serde_json::to_string(&public_manifest(manifest)).unwrap_or_else(|_| "{}".into())
948 };
949 return (200, body);
950 }
951 }
952
953 if url == "/api/openapi.json" && method == HttpMethod::Get {
955 return (200, ctx.openapi.generate(""));
956 }
957
958 if let Some(r) = routes::auth::handle(ctx, method, url, body, auth_token) {
964 return r;
965 }
966
967 if let Some(r) = routes::sync::handle(ctx, method, url, body, auth_token) {
972 return r;
973 }
974
975 if let Some(r) = routes::rooms::handle(ctx, method, url, body, auth_token) {
980 return r;
981 }
982
983 if let Some(r) = routes::links::handle(ctx, method, url, body, auth_token) {
988 return r;
989 }
990 if let Some(r) = routes::files::handle(ctx, method, url, body, auth_token) {
991 return r;
992 }
993 if let Some(r) = routes::crdt::handle(ctx, method, url, body, auth_token) {
994 return r;
995 }
996
997 if let Some(r) = routes::queries::handle(ctx, method, url, body, auth_token) {
1005 return r;
1006 }
1007 if let Some(r) = routes::actions::handle(ctx, method, url, body, auth_token) {
1008 return r;
1009 }
1010 if let Some(r) = routes::admin_data::handle(ctx, method, url, body, auth_token) {
1011 return r;
1012 }
1013 if let Some(r) = routes::auth_admin::handle(ctx, method, url, body, auth_token) {
1014 return r;
1015 }
1016 if let Some(r) = routes::ops_admin::handle(ctx, method, url, body, auth_token) {
1017 return r;
1018 }
1019 if let Some(r) = routes::search::handle(ctx, method, url, body, auth_token) {
1020 return r;
1021 }
1022
1023 if let Some(r) = routes::entities::handle(ctx, method, url, body, auth_token) {
1029 return r;
1030 }
1031
1032 if let Some(r) = routes::infra::handle(ctx, method, url, body, auth_token) {
1041 return r;
1042 }
1043 if let Some(r) = routes::functions::handle(ctx, method, url, body, auth_token) {
1044 return r;
1045 }
1046 if let Some(r) = routes::shards::handle(ctx, method, url, body, auth_token) {
1047 return r;
1048 }
1049 if let Some(r) = routes::workflows::handle(ctx, method, url, body, auth_token) {
1050 return r;
1051 }
1052 if let Some(r) = routes::ai::handle(ctx, method, url, body, auth_token) {
1053 return r;
1054 }
1055
1056 (
1061 404,
1062 json_error_with_hint(
1063 "NOT_FOUND",
1064 &format!("No API route matches {url}"),
1065 "Available endpoints: /api/entities/<entity>, /api/actions/<name>, /api/query, /api/auth/*, /api/sync/*, /api/files/*, /api/cache, /api/pubsub/*, /api/jobs, /api/scheduler, /api/workflows, /api/ai/*, /studio",
1066 ),
1067 )
1068}
1069
1070pub(crate) fn handle_list(store: &dyn DataStore, entity: &str, url: &str) -> (u16, String) {
1075 let limit: Option<usize> = url
1076 .split("limit=")
1077 .nth(1)
1078 .and_then(|s| s.split('&').next())
1079 .and_then(|s| s.parse().ok());
1080 let offset: usize = url
1081 .split("offset=")
1082 .nth(1)
1083 .and_then(|s| s.split('&').next())
1084 .and_then(|s| s.parse().ok())
1085 .unwrap_or(0);
1086
1087 let mut filter = serde_json::Map::new();
1090 if let Some(l) = limit {
1091 filter.insert("$limit".into(), serde_json::json!(l));
1092 }
1093 if offset > 0 {
1094 filter.insert("$offset".into(), serde_json::json!(offset));
1095 }
1096 let filter = serde_json::Value::Object(filter);
1097
1098 match store.query_filtered(entity, &filter) {
1099 Ok(rows) => {
1100 let count = rows.len();
1105 (
1106 200,
1107 serde_json::json!({
1108 "data": rows,
1109 "count": count,
1110 "offset": offset,
1111 "limit": limit,
1112 })
1113 .to_string(),
1114 )
1115 }
1116 Err(e) => (400, json_error(&e.code, &e.message)),
1117 }
1118}
1119
1120pub(crate) fn handle_get(store: &dyn DataStore, entity: &str, id: &str) -> (u16, String) {
1121 match store.get_by_id(entity, id) {
1122 Ok(Some(row)) => (
1123 200,
1124 serde_json::to_string(&row).unwrap_or_else(|_| "{}".into()),
1125 ),
1126 Ok(None) => (
1127 404,
1128 json_error("NOT_FOUND", &format!("{entity} with id \"{id}\" not found")),
1129 ),
1130 Err(e) => (400, json_error(&e.code, &e.message)),
1131 }
1132}
1133
1134pub(crate) fn handle_insert(ctx: &RouterContext, entity: &str, body: &str) -> (u16, String) {
1135 let mut data: serde_json::Value = match serde_json::from_str(body) {
1136 Ok(v) => v,
1137 Err(e) => {
1138 return (
1139 400,
1140 json_error_safe(
1141 "INVALID_JSON",
1142 "Invalid request body",
1143 &format!("Invalid JSON: {e}"),
1144 ),
1145 )
1146 }
1147 };
1148 if let Err((status, code, msg)) =
1152 ctx.plugin_hooks
1153 .before_insert(entity, &mut data, ctx.auth_ctx)
1154 {
1155 return (status, json_error(&code, &msg));
1156 }
1157 match ctx.store.insert(entity, &data) {
1158 Ok(id) => {
1159 let seq = ctx
1160 .change_log
1161 .append(entity, &id, ChangeKind::Insert, Some(data.clone()));
1162 broadcast_change_with_crdt(
1163 ctx.notifier,
1164 ctx.store,
1165 seq,
1166 entity,
1167 &id,
1168 ChangeKind::Insert,
1169 Some(&data),
1170 );
1171 ctx.plugin_hooks
1172 .after_insert(entity, &id, &data, ctx.auth_ctx);
1173 (201, serde_json::json!({"id": id}).to_string())
1174 }
1175 Err(e) => (400, json_error(&e.code, &e.message)),
1176 }
1177}
1178
1179pub(crate) fn handle_update(
1180 ctx: &RouterContext,
1181 entity: &str,
1182 id: &str,
1183 body: &str,
1184) -> (u16, String) {
1185 let mut data: serde_json::Value = match serde_json::from_str(body) {
1186 Ok(v) => v,
1187 Err(e) => {
1188 return (
1189 400,
1190 json_error_safe(
1191 "INVALID_JSON",
1192 "Invalid request body",
1193 &format!("Invalid JSON: {e}"),
1194 ),
1195 )
1196 }
1197 };
1198 if let Err((status, code, msg)) =
1199 ctx.plugin_hooks
1200 .before_update(entity, id, &mut data, ctx.auth_ctx)
1201 {
1202 return (status, json_error(&code, &msg));
1203 }
1204 match ctx.store.update(entity, id, &data) {
1205 Ok(true) => {
1206 let seq = ctx
1207 .change_log
1208 .append(entity, id, ChangeKind::Update, Some(data.clone()));
1209 broadcast_change_with_crdt(
1210 ctx.notifier,
1211 ctx.store,
1212 seq,
1213 entity,
1214 id,
1215 ChangeKind::Update,
1216 Some(&data),
1217 );
1218 ctx.plugin_hooks
1219 .after_update(entity, id, &data, ctx.auth_ctx);
1220 (200, serde_json::json!({"updated": true}).to_string())
1221 }
1222 Ok(false) => (
1223 404,
1224 json_error("NOT_FOUND", &format!("{entity}/{id} not found")),
1225 ),
1226 Err(e) => (400, json_error(&e.code, &e.message)),
1227 }
1228}
1229
1230pub(crate) fn handle_delete(ctx: &RouterContext, entity: &str, id: &str) -> (u16, String) {
1231 if let Err((status, code, msg)) = ctx.plugin_hooks.before_delete(entity, id, ctx.auth_ctx) {
1232 return (status, json_error(&code, &msg));
1233 }
1234 match ctx.store.delete(entity, id) {
1235 Ok(true) => {
1236 let seq = ctx.change_log.append(entity, id, ChangeKind::Delete, None);
1237 broadcast_change(ctx.notifier, seq, entity, id, ChangeKind::Delete, None);
1238 ctx.plugin_hooks.after_delete(entity, id, ctx.auth_ctx);
1239 (200, serde_json::json!({"deleted": true}).to_string())
1240 }
1241 Ok(false) => (
1242 404,
1243 json_error("NOT_FOUND", &format!("{entity}/{id} not found")),
1244 ),
1245 Err(e) => (400, json_error(&e.code, &e.message)),
1246 }
1247}
1248
1249pub(crate) fn broadcast_change(
1254 notifier: &dyn ChangeNotifier,
1255 seq: u64,
1256 entity: &str,
1257 row_id: &str,
1258 kind: ChangeKind,
1259 data: Option<&serde_json::Value>,
1260) {
1261 let event = pylon_sync::ChangeEvent {
1262 seq,
1263 entity: entity.to_string(),
1264 row_id: row_id.to_string(),
1265 kind,
1266 data: data.cloned(),
1267 timestamp: String::new(),
1268 };
1269 notifier.notify(&event);
1270}
1271
1272pub fn broadcast_change_with_crdt(
1282 notifier: &dyn ChangeNotifier,
1283 store: &dyn DataStore,
1284 seq: u64,
1285 entity: &str,
1286 row_id: &str,
1287 kind: ChangeKind,
1288 data: Option<&serde_json::Value>,
1289) {
1290 broadcast_change(notifier, seq, entity, row_id, kind.clone(), data);
1291 if matches!(kind, ChangeKind::Delete) {
1292 return;
1293 }
1294 if let Ok(Some(snapshot)) = store.crdt_snapshot(entity, row_id) {
1295 notifier.notify_crdt(entity, row_id, &snapshot);
1296 }
1297}
1298
1299pub(crate) fn decode_hex(s: &str) -> Option<Vec<u8>> {
1304 if s.len() % 2 != 0 {
1305 return None;
1306 }
1307 let mut out = Vec::with_capacity(s.len() / 2);
1308 let bytes = s.as_bytes();
1309 let mut i = 0;
1310 while i < bytes.len() {
1311 let hi = hex_nibble(bytes[i])?;
1312 let lo = hex_nibble(bytes[i + 1])?;
1313 out.push((hi << 4) | lo);
1314 i += 2;
1315 }
1316 Some(out)
1317}
1318
1319fn hex_nibble(b: u8) -> Option<u8> {
1320 match b {
1321 b'0'..=b'9' => Some(b - b'0'),
1322 b'a'..=b'f' => Some(b - b'a' + 10),
1323 b'A'..=b'F' => Some(b - b'A' + 10),
1324 _ => None,
1325 }
1326}
1327
1328pub fn json_error(code: &str, message: &str) -> String {
1329 serde_json::json!({"error": {"code": code, "message": message}}).to_string()
1330}
1331
1332const USER_REF_FIELDS: &[&str] = &[
1340 "userId",
1341 "user_id",
1342 "authorId",
1343 "author_id",
1344 "ownerId",
1345 "owner_id",
1346 "createdBy",
1347 "created_by",
1348];
1349
1350pub(crate) fn gdpr_export(ctx: &RouterContext, user_id: &str) -> (u16, String) {
1355 let manifest = ctx.store.manifest();
1356 let mut entities = serde_json::Map::new();
1357
1358 if let Ok(Some(user_row)) = ctx.store.get_by_id("User", user_id) {
1360 entities.insert("User".to_string(), serde_json::json!([user_row]));
1361 }
1362
1363 for ent in &manifest.entities {
1365 if ent.name == "User" {
1366 continue; }
1368 let user_field = ent
1369 .fields
1370 .iter()
1371 .find(|f| USER_REF_FIELDS.contains(&f.name.as_str()));
1372 let Some(field) = user_field else { continue };
1373 let filter = serde_json::json!({ &field.name: user_id });
1374 match ctx.store.query_filtered(&ent.name, &filter) {
1375 Ok(rows) if !rows.is_empty() => {
1376 entities.insert(ent.name.clone(), serde_json::Value::Array(rows));
1377 }
1378 Ok(_) => {}
1379 Err(e) => {
1380 tracing::warn!("[gdpr] export: query {} failed: {}", ent.name, e.message);
1381 }
1382 }
1383 }
1384
1385 let envelope = serde_json::json!({
1386 "user_id": user_id,
1387 "exported_at": pylon_kernel::util::now_iso(),
1388 "entities": entities,
1389 });
1390 (200, envelope.to_string())
1391}
1392
1393pub(crate) fn gdpr_purge(ctx: &RouterContext, user_id: &str) -> (u16, String) {
1397 let manifest = ctx.store.manifest();
1398 let mut deleted: u64 = 0;
1399 let mut errors: Vec<String> = Vec::new();
1400
1401 if let Ok(true) = ctx.store.delete("User", user_id) {
1403 deleted += 1;
1404 let seq = ctx
1406 .change_log
1407 .append("User", user_id, ChangeKind::Delete, None);
1408 broadcast_change(ctx.notifier, seq, "User", user_id, ChangeKind::Delete, None);
1409 }
1410
1411 for ent in &manifest.entities {
1413 if ent.name == "User" {
1414 continue;
1415 }
1416 let Some(field) = ent
1417 .fields
1418 .iter()
1419 .find(|f| USER_REF_FIELDS.contains(&f.name.as_str()))
1420 else {
1421 continue;
1422 };
1423 let filter = serde_json::json!({ &field.name: user_id });
1424 let rows = match ctx.store.query_filtered(&ent.name, &filter) {
1425 Ok(r) => r,
1426 Err(e) => {
1427 errors.push(format!("query {}: {}", ent.name, e.message));
1428 continue;
1429 }
1430 };
1431 for row in rows {
1432 let Some(id) = row.get("id").and_then(|v| v.as_str()) else {
1433 continue;
1434 };
1435 match ctx.store.delete(&ent.name, id) {
1436 Ok(true) => {
1437 deleted += 1;
1438 let seq = ctx
1439 .change_log
1440 .append(&ent.name, id, ChangeKind::Delete, None);
1441 broadcast_change(ctx.notifier, seq, &ent.name, id, ChangeKind::Delete, None);
1442 }
1443 Ok(false) => {}
1444 Err(e) => errors.push(format!("delete {}/{}: {}", ent.name, id, e.message)),
1445 }
1446 }
1447 }
1448
1449 let revoked = ctx.session_store.revoke_all_for_user(user_id);
1452
1453 let resp = serde_json::json!({
1454 "user_id": user_id,
1455 "rows_deleted": deleted,
1456 "sessions_revoked": revoked,
1457 "errors": errors,
1458 "purged_at": pylon_kernel::util::now_iso(),
1459 });
1460 (200, resp.to_string())
1461}
1462
1463pub(crate) fn require_admin(ctx: &RouterContext) -> Option<(u16, String)> {
1468 if ctx.auth_ctx.is_admin {
1469 None
1470 } else {
1471 Some((
1472 403,
1473 json_error(
1474 "FORBIDDEN",
1475 "this endpoint requires admin auth (PYLON_ADMIN_TOKEN)",
1476 ),
1477 ))
1478 }
1479}
1480
1481pub(crate) fn require_auth(ctx: &RouterContext) -> Option<(u16, String)> {
1486 if ctx.auth_ctx.is_admin || ctx.auth_ctx.user_id.is_some() {
1487 None
1488 } else {
1489 Some((
1490 401,
1491 json_error("AUTH_REQUIRED", "authenticated session required"),
1492 ))
1493 }
1494}
1495
1496pub fn json_error_with_hint(code: &str, message: &str, hint: &str) -> String {
1497 serde_json::json!({"error": {"code": code, "message": message, "hint": hint}}).to_string()
1498}
1499
1500pub fn json_error_safe(code: &str, user_message: &str, internal: &str) -> String {
1501 tracing::warn!("[error] {code}: {internal}");
1502 json_error(code, user_message)
1503}
1504
1505pub(crate) fn parse_json(body: &str) -> Result<serde_json::Value, (u16, String)> {
1507 serde_json::from_str(body).map_err(|e| {
1508 (
1509 400,
1510 json_error_safe(
1511 "INVALID_JSON",
1512 "Invalid request body",
1513 &format!("Invalid JSON: {e}"),
1514 ),
1515 )
1516 })
1517}
1518
1519pub(crate) fn query_param<'a>(url: &'a str, key: &str) -> Option<&'a str> {
1521 let search = format!("{key}=");
1522 url.split(&search).nth(1).and_then(|s| s.split('&').next())
1523}
1524
1525pub(crate) fn chrono_now_iso() -> String {
1526 pylon_kernel::util::now_iso()
1527}
1528
1529#[cfg(test)]
1539mod auth_gate_tests {
1540 use super::*;
1541 use pylon_auth::{AuthContext, CookieConfig, MagicCodeStore, OAuthStateStore, SessionStore};
1542 use pylon_kernel::{AppManifest, MANIFEST_VERSION};
1543 use pylon_policy::PolicyEngine;
1544 use pylon_sync::ChangeLog;
1545
1546 struct StubDataStore {
1552 manifest: AppManifest,
1553 }
1554 impl pylon_http::DataStore for StubDataStore {
1555 fn manifest(&self) -> &AppManifest {
1556 &self.manifest
1557 }
1558 fn insert(
1559 &self,
1560 _entity: &str,
1561 _data: &serde_json::Value,
1562 ) -> Result<String, pylon_http::DataError> {
1563 Ok("stub-id".to_string())
1564 }
1565 fn get_by_id(
1566 &self,
1567 _entity: &str,
1568 _id: &str,
1569 ) -> Result<Option<serde_json::Value>, pylon_http::DataError> {
1570 Ok(None)
1571 }
1572 fn list(&self, _entity: &str) -> Result<Vec<serde_json::Value>, pylon_http::DataError> {
1573 Ok(Vec::new())
1574 }
1575 fn list_after(
1576 &self,
1577 _entity: &str,
1578 _after: Option<&str>,
1579 _limit: usize,
1580 ) -> Result<Vec<serde_json::Value>, pylon_http::DataError> {
1581 Ok(Vec::new())
1582 }
1583 fn update(
1584 &self,
1585 _entity: &str,
1586 _id: &str,
1587 _data: &serde_json::Value,
1588 ) -> Result<bool, pylon_http::DataError> {
1589 Ok(true)
1590 }
1591 fn delete(&self, _entity: &str, _id: &str) -> Result<bool, pylon_http::DataError> {
1592 Ok(true)
1593 }
1594 fn lookup(
1595 &self,
1596 _entity: &str,
1597 _field: &str,
1598 _value: &str,
1599 ) -> Result<Option<serde_json::Value>, pylon_http::DataError> {
1600 Ok(None)
1601 }
1602 fn link(
1603 &self,
1604 _entity: &str,
1605 _id: &str,
1606 _relation: &str,
1607 _target_id: &str,
1608 ) -> Result<bool, pylon_http::DataError> {
1609 Ok(true)
1610 }
1611 fn unlink(
1612 &self,
1613 _entity: &str,
1614 _id: &str,
1615 _relation: &str,
1616 ) -> Result<bool, pylon_http::DataError> {
1617 Ok(true)
1618 }
1619 fn query_filtered(
1620 &self,
1621 _entity: &str,
1622 _filter: &serde_json::Value,
1623 ) -> Result<Vec<serde_json::Value>, pylon_http::DataError> {
1624 Ok(Vec::new())
1625 }
1626 fn query_graph(
1627 &self,
1628 _query: &serde_json::Value,
1629 ) -> Result<serde_json::Value, pylon_http::DataError> {
1630 Ok(serde_json::json!({}))
1631 }
1632 fn transact(
1633 &self,
1634 _ops: &[serde_json::Value],
1635 ) -> Result<(bool, Vec<serde_json::Value>), pylon_http::DataError> {
1636 Ok((true, Vec::new()))
1637 }
1638 }
1639
1640 macro_rules! stub_ops {
1641 ($name:ident, $trait:path) => {
1642 struct $name;
1643 };
1644 }
1645
1646 stub_ops!(StubRooms, RoomOps);
1647 stub_ops!(StubCache, CacheOps);
1648 stub_ops!(StubPubSub, PubSubOps);
1649 stub_ops!(StubJobs, JobOps);
1650 stub_ops!(StubScheduler, SchedulerOps);
1651 stub_ops!(StubWorkflows, WorkflowOps);
1652 stub_ops!(StubFiles, FileOps);
1653 stub_ops!(StubOpenApi, OpenApiGenerator);
1654 stub_ops!(StubEmail, EmailSender);
1655
1656 impl RoomOps for StubRooms {
1657 fn join(
1658 &self,
1659 _room: &str,
1660 _user_id: &str,
1661 _data: Option<serde_json::Value>,
1662 ) -> Result<(serde_json::Value, serde_json::Value), pylon_http::DataError> {
1663 Ok((serde_json::json!({}), serde_json::json!({})))
1664 }
1665 fn leave(&self, _room: &str, _user_id: &str) -> Option<serde_json::Value> {
1666 None
1667 }
1668 fn set_presence(
1669 &self,
1670 _room: &str,
1671 _user_id: &str,
1672 _data: serde_json::Value,
1673 ) -> Option<serde_json::Value> {
1674 None
1675 }
1676 fn broadcast(
1677 &self,
1678 _room: &str,
1679 _sender: Option<&str>,
1680 _topic: &str,
1681 _data: serde_json::Value,
1682 ) -> Option<serde_json::Value> {
1683 None
1684 }
1685 fn list_rooms(&self) -> Vec<String> {
1686 vec![]
1687 }
1688 fn room_size(&self, _name: &str) -> usize {
1689 0
1690 }
1691 fn members(&self, _name: &str) -> Vec<serde_json::Value> {
1692 vec![]
1693 }
1694 }
1695 impl CacheOps for StubCache {
1696 fn handle_command(&self, _body: &str) -> (u16, String) {
1697 (200, "{}".into())
1698 }
1699 fn handle_get(&self, _key: &str) -> (u16, String) {
1700 (404, "{}".into())
1701 }
1702 fn handle_delete(&self, _key: &str) -> (u16, String) {
1703 (200, "{}".into())
1704 }
1705 }
1706 impl PubSubOps for StubPubSub {
1707 fn handle_publish(&self, _body: &str) -> (u16, String) {
1708 (200, "{}".into())
1709 }
1710 fn handle_channels(&self) -> (u16, String) {
1711 (200, "[]".into())
1712 }
1713 fn handle_history(&self, _channel: &str, _url: &str) -> (u16, String) {
1714 (200, "[]".into())
1715 }
1716 }
1717 impl JobOps for StubJobs {
1718 fn enqueue(
1719 &self,
1720 _name: &str,
1721 _payload: serde_json::Value,
1722 _priority: &str,
1723 _delay_secs: u64,
1724 _max_retries: u32,
1725 _queue: &str,
1726 ) -> String {
1727 "job-id".into()
1728 }
1729 fn stats(&self) -> serde_json::Value {
1730 serde_json::json!({})
1731 }
1732 fn dead_letters(&self) -> serde_json::Value {
1733 serde_json::json!([])
1734 }
1735 fn retry_dead(&self, _id: &str) -> bool {
1736 false
1737 }
1738 fn list_jobs(
1739 &self,
1740 _status: Option<&str>,
1741 _queue: Option<&str>,
1742 _limit: usize,
1743 ) -> serde_json::Value {
1744 serde_json::json!([])
1745 }
1746 fn get_job(&self, _id: &str) -> Option<serde_json::Value> {
1747 None
1748 }
1749 }
1750 impl SchedulerOps for StubScheduler {
1751 fn list_tasks(&self) -> serde_json::Value {
1752 serde_json::json!([])
1753 }
1754 fn trigger(&self, _name: &str) -> bool {
1755 false
1756 }
1757 }
1758 impl WorkflowOps for StubWorkflows {
1759 fn definitions(&self) -> serde_json::Value {
1760 serde_json::json!([])
1761 }
1762 fn start(&self, _name: &str, _input: serde_json::Value) -> Result<String, String> {
1763 Ok("wf-id".into())
1764 }
1765 fn list(&self, _status: Option<&str>) -> serde_json::Value {
1766 serde_json::json!([])
1767 }
1768 fn get(&self, _id: &str) -> Option<serde_json::Value> {
1769 None
1770 }
1771 fn advance(&self, _id: &str) -> Result<String, String> {
1772 Ok("running".into())
1773 }
1774 fn send_event(
1775 &self,
1776 _id: &str,
1777 _event: &str,
1778 _data: serde_json::Value,
1779 ) -> Result<(), String> {
1780 Ok(())
1781 }
1782 fn cancel(&self, _id: &str) -> Result<(), String> {
1783 Ok(())
1784 }
1785 }
1786 impl FileOps for StubFiles {
1787 fn upload(&self, _body: &str) -> (u16, String) {
1788 (501, "{}".into())
1789 }
1790 fn get_file(&self, _id: &str) -> (u16, String) {
1791 (404, "{}".into())
1792 }
1793 }
1794 impl OpenApiGenerator for StubOpenApi {
1795 fn generate(&self, _base: &str) -> String {
1796 "{}".into()
1797 }
1798 }
1799 impl EmailSender for StubEmail {
1800 fn send(&self, _to: &str, _subject: &str, _body: &str) -> Result<(), String> {
1801 Ok(())
1802 }
1803 }
1804
1805 fn empty_manifest() -> AppManifest {
1806 AppManifest {
1807 manifest_version: MANIFEST_VERSION,
1808 name: "test".into(),
1809 version: "0.1.0".into(),
1810 entities: vec![],
1811 routes: vec![],
1812 queries: vec![],
1813 actions: vec![],
1814 policies: vec![],
1815 auth: Default::default(),
1816 }
1817 }
1818
1819 fn with_ctx<F>(is_dev: bool, auth: &AuthContext, f: F)
1821 where
1822 F: FnOnce(&RouterContext),
1823 {
1824 with_ctx_hooks(is_dev, auth, &NoopPluginHooks, f);
1825 }
1826
1827 fn with_ctx_hooks<F>(is_dev: bool, auth: &AuthContext, hooks: &dyn PluginHookOps, f: F)
1828 where
1829 F: FnOnce(&RouterContext),
1830 {
1831 let manifest = empty_manifest();
1832 let store = StubDataStore {
1833 manifest: manifest.clone(),
1834 };
1835 let session_store = SessionStore::new();
1836 let magic_codes = MagicCodeStore::new();
1837 let oauth_state = OAuthStateStore::new();
1838 let account_store = pylon_auth::AccountStore::new();
1839 let api_keys = pylon_auth::api_key::ApiKeyStore::new();
1840 let orgs = pylon_auth::org::OrgStore::new();
1841 let siwe = pylon_auth::siwe::NonceStore::new();
1842 let phone_codes = pylon_auth::phone::PhoneCodeStore::new();
1843 let passkeys = pylon_auth::webauthn::PasskeyStore::new();
1844 let verification = pylon_auth::verification::VerificationStore::new();
1845 let audit = pylon_auth::audit::AuditStore::new();
1846 let policy_engine = PolicyEngine::from_manifest(&manifest);
1847 let change_log = ChangeLog::new();
1848 let notifier = NoopNotifier;
1849 let rooms = StubRooms;
1850 let cache = StubCache;
1851 let pubsub = StubPubSub;
1852 let jobs = StubJobs;
1853 let scheduler = StubScheduler;
1854 let workflows = StubWorkflows;
1855 let files = StubFiles;
1856 let openapi = StubOpenApi;
1857 let email = StubEmail;
1858 let cookie_config = CookieConfig::from_env(&CookieConfig::default_name_for("test"));
1859
1860 let ctx = RouterContext {
1861 store: &store,
1862 session_store: &session_store,
1863 magic_codes: &magic_codes,
1864 oauth_state: &oauth_state,
1865 account_store: &account_store,
1866 api_keys: &api_keys,
1867 orgs: &orgs,
1868 siwe: &siwe,
1869 phone_codes: &phone_codes,
1870 passkeys: &passkeys,
1871 verification: &verification,
1872 audit: &audit,
1873 trusted_origins: &[],
1874 policy_engine: &policy_engine,
1875 change_log: &change_log,
1876 notifier: ¬ifier,
1877 rooms: &rooms,
1878 cache: &cache,
1879 pubsub: &pubsub,
1880 jobs: &jobs,
1881 scheduler: &scheduler,
1882 workflows: &workflows,
1883 files: &files,
1884 openapi: &openapi,
1885 functions: None,
1886 email: &email,
1887 shards: None,
1888 plugin_hooks: hooks,
1889 auth_ctx: auth,
1890 is_dev,
1891 request_headers: &[],
1892 peer_ip: "127.0.0.1",
1893 cookie_config: &cookie_config,
1894 response_headers: RefCell::new(Vec::new()),
1895 };
1896 f(&ctx);
1897 }
1898
1899 #[test]
1906 fn auth_session_refuses_non_admin_in_prod() {
1907 let anon = AuthContext::anonymous();
1908 with_ctx(false, &anon, |ctx| {
1909 let (status, body, _ct) = route(
1910 ctx,
1911 HttpMethod::Post,
1912 "/api/auth/session",
1913 r#"{"user_id":"victim"}"#,
1914 None,
1915 );
1916 assert_eq!(status, 403);
1917 assert!(body.contains("FORBIDDEN"));
1918 });
1919 }
1920
1921 #[test]
1922 fn auth_session_allowed_for_admin_in_prod() {
1923 let admin = AuthContext::admin();
1924 with_ctx(false, &admin, |ctx| {
1925 let (status, _body, _ct) = route(
1926 ctx,
1927 HttpMethod::Post,
1928 "/api/auth/session",
1929 r#"{"user_id":"alice"}"#,
1930 None,
1931 );
1932 assert_eq!(status, 201);
1933 });
1934 }
1935
1936 #[test]
1940 fn oauth_callback_refuses_missing_code_in_prod() {
1941 let anon = AuthContext::anonymous();
1942 with_ctx(false, &anon, |ctx| {
1943 let state = ctx
1946 .oauth_state
1947 .create("google", "https://app/cb", "https://app/cb");
1948 let body = format!(r#"{{"state":"{state}","email":"victim@example.com"}}"#);
1949 let (status, resp, _ct) = route(
1950 ctx,
1951 HttpMethod::Post,
1952 "/api/auth/callback/google",
1953 &body,
1954 None,
1955 );
1956 assert_eq!(status, 400);
1957 assert!(resp.contains("authorization") || resp.contains("code"));
1958 });
1959 }
1960
1961 #[test]
1963 fn sync_push_requires_admin() {
1964 let anon = AuthContext::anonymous();
1965 with_ctx(false, &anon, |ctx| {
1966 let (status, body, _ct) = route(
1967 ctx,
1968 HttpMethod::Post,
1969 "/api/sync/push",
1970 r#"{"changes":[]}"#,
1971 None,
1972 );
1973 assert_eq!(status, 403);
1974 assert!(body.contains("FORBIDDEN"));
1975 });
1976 }
1977
1978 #[test]
1980 fn transact_requires_admin() {
1981 let anon = AuthContext::anonymous();
1982 with_ctx(false, &anon, |ctx| {
1983 let (status, _body, _ct) = route(
1984 ctx,
1985 HttpMethod::Post,
1986 "/api/transact",
1987 r#"{"ops":[]}"#,
1988 None,
1989 );
1990 assert_eq!(status, 403);
1991 });
1992 }
1993
1994 #[test]
1996 fn workflow_start_requires_admin() {
1997 let anon = AuthContext::anonymous();
1998 with_ctx(false, &anon, |ctx| {
1999 let (status, _body, _ct) = route(
2000 ctx,
2001 HttpMethod::Post,
2002 "/api/workflows/start",
2003 r#"{"name":"x"}"#,
2004 None,
2005 );
2006 assert_eq!(status, 403);
2007 });
2008 }
2009
2010 #[test]
2012 fn jobs_enqueue_requires_admin() {
2013 let anon = AuthContext::anonymous();
2014 with_ctx(false, &anon, |ctx| {
2015 let (status, _body, _ct) = route(
2016 ctx,
2017 HttpMethod::Post,
2018 "/api/jobs",
2019 r#"{"name":"x","payload":{}}"#,
2020 None,
2021 );
2022 assert_eq!(status, 403);
2023 });
2024 }
2025
2026 fn assert_route_doesnt_panic(ctx: &RouterContext, method: HttpMethod, url: &str, body: &str) {
2037 let (_status, _body, _ct) = route(ctx, method, url, body, None);
2041 }
2042
2043 #[test]
2044 fn fuzz_malformed_json_bodies_never_panic() {
2045 let admin = AuthContext::admin();
2046 with_ctx(true, &admin, |ctx| {
2047 let samples = [
2048 "",
2049 "not json",
2050 "{",
2051 "}",
2052 "{\"",
2053 "{\"key\":",
2054 "[]",
2055 "null",
2056 "true",
2057 "\"string\"",
2058 "{\"changes\":\"not an array\"}",
2059 &format!("{{\"deeply\":{}}}", "{".repeat(1000)),
2060 "{\"unicode\":\"\\u0000\"}",
2061 "{\"numbers\":1e308}",
2062 "{\"negative\":-999999999999999}",
2063 ];
2064 for body in &samples {
2065 for url in &[
2066 "/api/sync/push",
2067 "/api/transact",
2068 "/api/import",
2069 "/api/batch",
2070 "/api/jobs",
2071 "/api/auth/session",
2072 "/api/auth/magic/send",
2073 ] {
2074 assert_route_doesnt_panic(ctx, HttpMethod::Post, url, body);
2075 }
2076 }
2077 });
2078 }
2079
2080 #[test]
2081 fn fuzz_weird_urls_never_panic() {
2082 let admin = AuthContext::admin();
2083 with_ctx(true, &admin, |ctx| {
2084 let samples = [
2085 "/",
2086 "/api",
2087 "/api/",
2088 "/api/entities/",
2089 "/api/entities//",
2090 "/api/entities/%00",
2091 "/api/entities/../escape",
2092 "/api/entities/User?garbage=\x01",
2093 "/api/entities/User?$limit=abc&$order=garbage",
2094 &format!("/api/entities/{}", "a".repeat(10_000)),
2095 "/api/fn/",
2096 "/api/fn/traces",
2097 "/api/shards/id/connect",
2098 "/api/workflows/definitions",
2099 "/api/workflows/nonexistent/advance",
2100 "/api/rooms/",
2101 "/api/rooms/%20",
2102 ];
2103 for url in &samples {
2104 assert_route_doesnt_panic(ctx, HttpMethod::Get, url, "");
2105 assert_route_doesnt_panic(ctx, HttpMethod::Post, url, "{}");
2106 assert_route_doesnt_panic(ctx, HttpMethod::Delete, url, "");
2107 }
2108 });
2109 }
2110
2111 #[test]
2112 fn fuzz_deeply_nested_json_dont_stack_overflow() {
2113 let admin = AuthContext::admin();
2116 with_ctx(true, &admin, |ctx| {
2117 let depth = 300;
2118 let body = format!("{}{}", "[".repeat(depth), "]".repeat(depth),);
2119 let (status, _body, _ct) = route(ctx, HttpMethod::Post, "/api/sync/push", &body, None);
2120 assert!(status >= 200 && status < 600);
2123 });
2124 }
2125
2126 #[test]
2127 fn fuzz_unusual_http_methods_gracefully() {
2128 let admin = AuthContext::admin();
2129 with_ctx(true, &admin, |ctx| {
2130 for method in [
2131 HttpMethod::Get,
2132 HttpMethod::Post,
2133 HttpMethod::Put,
2134 HttpMethod::Patch,
2135 HttpMethod::Delete,
2136 HttpMethod::Options,
2137 HttpMethod::Head,
2138 ] {
2139 let (_status, _body, _ct) = route(ctx, method, "/api/entities/User", "{}", None);
2140 }
2141 });
2142 }
2143
2144 struct UserStubStore {
2154 manifest: AppManifest,
2155 last_update: std::sync::Mutex<Option<(String, String, serde_json::Value)>>,
2156 }
2157 impl pylon_http::DataStore for UserStubStore {
2158 fn manifest(&self) -> &AppManifest {
2159 &self.manifest
2160 }
2161 fn insert(
2162 &self,
2163 _e: &str,
2164 _d: &serde_json::Value,
2165 ) -> Result<String, pylon_http::DataError> {
2166 Ok("u-1".into())
2167 }
2168 fn get_by_id(
2169 &self,
2170 entity: &str,
2171 id: &str,
2172 ) -> Result<Option<serde_json::Value>, pylon_http::DataError> {
2173 if entity == "User" && id == "u-1" {
2174 return Ok(Some(serde_json::json!({
2175 "id": "u-1",
2176 "email": "alice@example.com",
2177 "displayName": "Alice",
2178 })));
2179 }
2180 Ok(None)
2181 }
2182 fn list(&self, _e: &str) -> Result<Vec<serde_json::Value>, pylon_http::DataError> {
2183 Ok(vec![])
2184 }
2185 fn list_after(
2186 &self,
2187 _e: &str,
2188 _a: Option<&str>,
2189 _l: usize,
2190 ) -> Result<Vec<serde_json::Value>, pylon_http::DataError> {
2191 Ok(vec![])
2192 }
2193 fn update(
2194 &self,
2195 entity: &str,
2196 id: &str,
2197 data: &serde_json::Value,
2198 ) -> Result<bool, pylon_http::DataError> {
2199 *self.last_update.lock().unwrap() = Some((entity.into(), id.into(), data.clone()));
2200 Ok(true)
2201 }
2202 fn delete(&self, _e: &str, _i: &str) -> Result<bool, pylon_http::DataError> {
2203 Ok(true)
2204 }
2205 fn lookup(
2206 &self,
2207 _e: &str,
2208 _f: &str,
2209 _v: &str,
2210 ) -> Result<Option<serde_json::Value>, pylon_http::DataError> {
2211 Ok(None)
2212 }
2213 fn link(
2214 &self,
2215 _e: &str,
2216 _i: &str,
2217 _r: &str,
2218 _t: &str,
2219 ) -> Result<bool, pylon_http::DataError> {
2220 Ok(true)
2221 }
2222 fn unlink(&self, _e: &str, _i: &str, _r: &str) -> Result<bool, pylon_http::DataError> {
2223 Ok(true)
2224 }
2225 fn query_filtered(
2226 &self,
2227 _e: &str,
2228 _f: &serde_json::Value,
2229 ) -> Result<Vec<serde_json::Value>, pylon_http::DataError> {
2230 Ok(vec![])
2231 }
2232 fn query_graph(
2233 &self,
2234 _q: &serde_json::Value,
2235 ) -> Result<serde_json::Value, pylon_http::DataError> {
2236 Ok(serde_json::json!({}))
2237 }
2238 fn aggregate(
2239 &self,
2240 _e: &str,
2241 _s: &serde_json::Value,
2242 ) -> Result<serde_json::Value, pylon_http::DataError> {
2243 Ok(serde_json::json!({}))
2244 }
2245 fn transact(
2246 &self,
2247 _o: &[serde_json::Value],
2248 ) -> Result<(bool, Vec<serde_json::Value>), pylon_http::DataError> {
2249 Ok((true, vec![]))
2250 }
2251 fn search(
2252 &self,
2253 _e: &str,
2254 _q: &serde_json::Value,
2255 ) -> Result<serde_json::Value, pylon_http::DataError> {
2256 Ok(serde_json::json!({}))
2257 }
2258 }
2259
2260 struct CaptureEmail {
2264 sent: std::sync::Mutex<Vec<(String, String, String)>>,
2265 }
2266 impl EmailSender for CaptureEmail {
2267 fn send(&self, to: &str, subject: &str, body: &str) -> Result<(), String> {
2268 self.sent
2269 .lock()
2270 .unwrap()
2271 .push((to.into(), subject.into(), body.into()));
2272 Ok(())
2273 }
2274 }
2275
2276 fn with_user_ctx<F>(is_dev: bool, auth: &AuthContext, f: F)
2277 where
2278 F: FnOnce(&RouterContext, &UserStubStore, &CaptureEmail, &MagicCodeStore),
2279 {
2280 let manifest = empty_manifest();
2281 let store = UserStubStore {
2282 manifest: manifest.clone(),
2283 last_update: std::sync::Mutex::new(None),
2284 };
2285 let session_store = SessionStore::new();
2286 let magic_codes = MagicCodeStore::new();
2287 let oauth_state = OAuthStateStore::new();
2288 let account_store = pylon_auth::AccountStore::new();
2289 let api_keys = pylon_auth::api_key::ApiKeyStore::new();
2290 let orgs = pylon_auth::org::OrgStore::new();
2291 let siwe = pylon_auth::siwe::NonceStore::new();
2292 let phone_codes = pylon_auth::phone::PhoneCodeStore::new();
2293 let passkeys = pylon_auth::webauthn::PasskeyStore::new();
2294 let verification = pylon_auth::verification::VerificationStore::new();
2295 let audit = pylon_auth::audit::AuditStore::new();
2296 let policy_engine = PolicyEngine::from_manifest(&manifest);
2297 let change_log = ChangeLog::new();
2298 let notifier = NoopNotifier;
2299 let rooms = StubRooms;
2300 let cache = StubCache;
2301 let pubsub = StubPubSub;
2302 let jobs = StubJobs;
2303 let scheduler = StubScheduler;
2304 let workflows = StubWorkflows;
2305 let files = StubFiles;
2306 let openapi = StubOpenApi;
2307 let email = CaptureEmail {
2308 sent: std::sync::Mutex::new(vec![]),
2309 };
2310 let hooks = NoopPluginHooks;
2311 let cookie_config = CookieConfig::from_env(&CookieConfig::default_name_for("test"));
2312
2313 let ctx = RouterContext {
2314 store: &store,
2315 session_store: &session_store,
2316 magic_codes: &magic_codes,
2317 oauth_state: &oauth_state,
2318 account_store: &account_store,
2319 api_keys: &api_keys,
2320 orgs: &orgs,
2321 siwe: &siwe,
2322 phone_codes: &phone_codes,
2323 passkeys: &passkeys,
2324 verification: &verification,
2325 audit: &audit,
2326 trusted_origins: &[],
2327 policy_engine: &policy_engine,
2328 change_log: &change_log,
2329 notifier: ¬ifier,
2330 rooms: &rooms,
2331 cache: &cache,
2332 pubsub: &pubsub,
2333 jobs: &jobs,
2334 scheduler: &scheduler,
2335 workflows: &workflows,
2336 files: &files,
2337 openapi: &openapi,
2338 functions: None,
2339 email: &email,
2340 shards: None,
2341 plugin_hooks: &hooks,
2342 auth_ctx: auth,
2343 is_dev,
2344 request_headers: &[],
2345 peer_ip: "127.0.0.1",
2346 cookie_config: &cookie_config,
2347 response_headers: RefCell::new(Vec::new()),
2348 };
2349 f(&ctx, &store, &email, &magic_codes);
2350 }
2351
2352 #[test]
2353 fn email_send_verification_requires_auth() {
2354 let anon = AuthContext::anonymous();
2355 with_user_ctx(true, &anon, |ctx, _, _, _| {
2356 let (status, body, _) = route(
2357 ctx,
2358 HttpMethod::Post,
2359 "/api/auth/email/send-verification",
2360 "{}",
2361 None,
2362 );
2363 assert_eq!(status, 401);
2364 assert!(body.contains("UNAUTHORIZED"));
2365 });
2366 }
2367
2368 #[test]
2369 fn email_verify_requires_auth() {
2370 let anon = AuthContext::anonymous();
2371 with_user_ctx(true, &anon, |ctx, _, _, _| {
2372 let (status, body, _) = route(
2373 ctx,
2374 HttpMethod::Post,
2375 "/api/auth/email/verify",
2376 r#"{"code":"123456"}"#,
2377 None,
2378 );
2379 assert_eq!(status, 401);
2380 assert!(body.contains("UNAUTHORIZED"));
2381 });
2382 }
2383
2384 #[test]
2385 fn email_send_verification_uses_session_email_not_body() {
2386 let alice = AuthContext::authenticated("u-1".into());
2391 with_user_ctx(true, &alice, |ctx, _, email, _| {
2392 let (status, body, _) = route(
2393 ctx,
2394 HttpMethod::Post,
2395 "/api/auth/email/send-verification",
2396 r#"{"email":"victim@example.com"}"#,
2397 None,
2398 );
2399 assert_eq!(status, 200);
2400 let sent = email.sent.lock().unwrap();
2403 assert_eq!(sent.len(), 1);
2404 assert_eq!(sent[0].0, "alice@example.com");
2405 assert!(body.contains("alice@example.com"));
2406 assert!(!body.contains("victim@example.com"));
2407 });
2408 }
2409
2410 #[test]
2411 fn email_verify_happy_path_stamps_email_verified() {
2412 let alice = AuthContext::authenticated("u-1".into());
2413 with_user_ctx(true, &alice, |ctx, store, _, magic_codes| {
2414 let code = magic_codes.try_create("alice@example.com").unwrap();
2417 let body = format!(r#"{{"code":"{code}"}}"#);
2418 let (status, resp, _) =
2419 route(ctx, HttpMethod::Post, "/api/auth/email/verify", &body, None);
2420 assert_eq!(status, 200);
2421 assert!(resp.contains("\"verified\":true"));
2422 let last = store.last_update.lock().unwrap();
2424 let (entity, id, data) = last.as_ref().expect("update should have fired");
2425 assert_eq!(entity, "User");
2426 assert_eq!(id, "u-1");
2427 assert!(data.get("emailVerified").is_some());
2428 });
2429 }
2430
2431 #[test]
2432 fn email_verify_rejects_wrong_code() {
2433 let alice = AuthContext::authenticated("u-1".into());
2434 with_user_ctx(true, &alice, |ctx, store, _, magic_codes| {
2435 let _ = magic_codes.try_create("alice@example.com").unwrap();
2436 let (status, body, _) = route(
2437 ctx,
2438 HttpMethod::Post,
2439 "/api/auth/email/verify",
2440 r#"{"code":"999999"}"#,
2441 None,
2442 );
2443 assert_eq!(status, 401);
2444 assert!(body.contains("INVALID_CODE"));
2445 assert!(store.last_update.lock().unwrap().is_none());
2447 });
2448 }
2449
2450 #[test]
2452 fn auth_session_allowed_in_dev_mode() {
2453 let anon = AuthContext::anonymous();
2454 with_ctx(true, &anon, |ctx| {
2455 let (status, _body, _ct) = route(
2456 ctx,
2457 HttpMethod::Post,
2458 "/api/auth/session",
2459 r#"{"user_id":"alice"}"#,
2460 None,
2461 );
2462 assert_eq!(status, 201);
2463 });
2464 }
2465
2466 use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
2474
2475 struct CountingHooks {
2476 before_insert_calls: AtomicU32,
2477 after_insert_calls: AtomicU32,
2478 before_delete_calls: AtomicU32,
2479 reject_on_entity: Option<&'static str>,
2480 }
2481
2482 impl CountingHooks {
2483 fn new() -> Self {
2484 Self {
2485 before_insert_calls: AtomicU32::new(0),
2486 after_insert_calls: AtomicU32::new(0),
2487 before_delete_calls: AtomicU32::new(0),
2488 reject_on_entity: None,
2489 }
2490 }
2491 }
2492
2493 impl PluginHookOps for CountingHooks {
2494 fn before_insert(
2495 &self,
2496 entity: &str,
2497 _data: &mut serde_json::Value,
2498 _auth: &AuthContext,
2499 ) -> Result<(), (u16, String, String)> {
2500 self.before_insert_calls.fetch_add(1, Ordering::SeqCst);
2501 if self.reject_on_entity == Some(entity) {
2502 return Err((422, "VALIDATION".into(), "rejected by plugin".into()));
2503 }
2504 Ok(())
2505 }
2506 fn after_insert(
2507 &self,
2508 _entity: &str,
2509 _id: &str,
2510 _data: &serde_json::Value,
2511 _auth: &AuthContext,
2512 ) {
2513 self.after_insert_calls.fetch_add(1, Ordering::SeqCst);
2514 }
2515 fn before_update(
2516 &self,
2517 _entity: &str,
2518 _id: &str,
2519 _data: &mut serde_json::Value,
2520 _auth: &AuthContext,
2521 ) -> Result<(), (u16, String, String)> {
2522 Ok(())
2523 }
2524 fn after_update(
2525 &self,
2526 _entity: &str,
2527 _id: &str,
2528 _data: &serde_json::Value,
2529 _auth: &AuthContext,
2530 ) {
2531 }
2532 fn before_delete(
2533 &self,
2534 _entity: &str,
2535 _id: &str,
2536 _auth: &AuthContext,
2537 ) -> Result<(), (u16, String, String)> {
2538 self.before_delete_calls.fetch_add(1, Ordering::SeqCst);
2539 Ok(())
2540 }
2541 fn after_delete(&self, _entity: &str, _id: &str, _auth: &AuthContext) {}
2542 }
2543
2544 #[test]
2545 fn plugin_hooks_fire_on_entity_post() {
2546 let admin = AuthContext::admin();
2549 let hooks = CountingHooks::new();
2550 with_ctx_hooks(true, &admin, &hooks, |ctx| {
2551 let (status, _body, _ct) = route(
2552 ctx,
2553 HttpMethod::Post,
2554 "/api/entities/User",
2555 r#"{"email":"a@b"}"#,
2556 None,
2557 );
2558 assert_eq!(status, 201);
2559 });
2560 assert_eq!(hooks.before_insert_calls.load(Ordering::SeqCst), 1);
2561 assert_eq!(hooks.after_insert_calls.load(Ordering::SeqCst), 1);
2562 }
2563
2564 #[test]
2565 fn plugin_before_insert_rejection_short_circuits_write() {
2566 let admin = AuthContext::admin();
2569 let rejector = CountingHooks {
2570 reject_on_entity: Some("User"),
2571 ..CountingHooks::new()
2572 };
2573 with_ctx_hooks(true, &admin, &rejector, |ctx| {
2574 let (status, body, _ct) =
2575 route(ctx, HttpMethod::Post, "/api/entities/User", r#"{}"#, None);
2576 assert_eq!(status, 422);
2577 assert!(body.contains("VALIDATION"));
2578 });
2579 assert_eq!(rejector.before_insert_calls.load(Ordering::SeqCst), 1);
2580 assert_eq!(rejector.after_insert_calls.load(Ordering::SeqCst), 0);
2582 }
2583
2584 #[test]
2590 fn gdpr_export_requires_admin() {
2591 let anon = AuthContext::anonymous();
2592 with_ctx(false, &anon, |ctx| {
2593 let (status, body, _ct) = route(
2594 ctx,
2595 HttpMethod::Post,
2596 "/api/admin/users/alice/export",
2597 "",
2598 None,
2599 );
2600 assert_eq!(status, 403);
2601 assert!(body.contains("FORBIDDEN"));
2602 });
2603 }
2604
2605 #[test]
2606 fn gdpr_purge_requires_admin() {
2607 let anon = AuthContext::anonymous();
2608 with_ctx(false, &anon, |ctx| {
2609 let (status, _body, _ct) = route(
2610 ctx,
2611 HttpMethod::Delete,
2612 "/api/admin/users/alice/purge",
2613 "",
2614 None,
2615 );
2616 assert_eq!(status, 403);
2617 });
2618 }
2619
2620 #[test]
2621 fn gdpr_export_returns_envelope_for_admin() {
2622 let admin = AuthContext::admin();
2623 with_ctx(true, &admin, |ctx| {
2624 let (status, body, _ct) = route(
2625 ctx,
2626 HttpMethod::Post,
2627 "/api/admin/users/alice/export",
2628 "",
2629 None,
2630 );
2631 assert_eq!(status, 200);
2632 let v: serde_json::Value = serde_json::from_str(&body).unwrap();
2633 assert_eq!(v["user_id"], "alice");
2634 assert!(v["entities"].is_object());
2635 assert!(v["exported_at"].is_string());
2636 });
2637 }
2638
2639 #[test]
2640 fn plugin_hooks_fire_on_entity_delete() {
2641 let admin = AuthContext::admin();
2642 let hooks = CountingHooks::new();
2643 with_ctx_hooks(true, &admin, &hooks, |ctx| {
2644 let (status, _body, _ct) = route(
2645 ctx,
2646 HttpMethod::Delete,
2647 "/api/entities/User/stub-id",
2648 "",
2649 None,
2650 );
2651 assert_eq!(status, 200);
2652 });
2653 assert_eq!(hooks.before_delete_calls.load(Ordering::SeqCst), 1);
2654 }
2655
2656 #[test]
2666 fn webhook_returns_503_when_functions_unavailable() {
2667 let anon = AuthContext::anonymous();
2668 with_ctx(true, &anon, |ctx| {
2669 let (status, body, _ct) = route(
2670 ctx,
2671 HttpMethod::Post,
2672 "/api/webhooks/stripe_handler",
2673 "{}",
2674 None,
2675 );
2676 assert_eq!(status, 503);
2677 assert!(body.contains("FUNCTIONS_NOT_AVAILABLE"));
2678 });
2679 }
2680
2681 #[test]
2682 fn webhook_accepts_any_http_method() {
2683 let anon = AuthContext::anonymous();
2686 with_ctx(true, &anon, |ctx| {
2687 for method in [
2688 HttpMethod::Get,
2689 HttpMethod::Post,
2690 HttpMethod::Put,
2691 HttpMethod::Patch,
2692 HttpMethod::Delete,
2693 ] {
2694 let (status, _body, _ct) = route(ctx, method, "/api/webhooks/any_name", "", None);
2695 assert_ne!(status, 405);
2698 }
2699 });
2700 }
2701
2702 #[derive(Clone, Copy)]
2720 enum Expect {
2721 Eq(u16),
2723 Rejected,
2725 ReachedHandler,
2728 }
2729
2730 fn assert_expect(actual: u16, want: Expect, label: &str) {
2731 match want {
2732 Expect::Eq(s) => assert_eq!(actual, s, "{label}: expected status {s}, got {actual}"),
2733 Expect::Rejected => assert!(
2734 actual == 401 || actual == 403,
2735 "{label}: expected 401 or 403, got {actual}"
2736 ),
2737 Expect::ReachedHandler => assert_ne!(
2738 actual, 405,
2739 "{label}: route should accept this method, got 405"
2740 ),
2741 }
2742 }
2743
2744 fn matrix_check(
2749 method: HttpMethod,
2750 url: &str,
2751 body: &str,
2752 expect_anon: Expect,
2753 expect_guest: Expect,
2754 expect_user: Expect,
2755 ) {
2756 let anon = AuthContext::anonymous();
2757 let guest = AuthContext::guest("guest-1".into());
2758 let user = AuthContext::authenticated("u-1".into());
2759
2760 for (auth, want, who) in [
2761 (&anon, expect_anon, "anon"),
2762 (&guest, expect_guest, "guest"),
2763 (&user, expect_user, "user"),
2764 ] {
2765 with_ctx(false, auth, |ctx| {
2766 let (status, _body, _ct) = route(ctx, method, url, body, None);
2767 assert_expect(status, want, &format!("{who} {method:?} {url}"));
2768 });
2769 }
2770 }
2771
2772 fn matrix_check_with_deny_policy(
2778 deny_entity: &str,
2779 method: HttpMethod,
2780 url: &str,
2781 body: &str,
2782 expect_anon: Expect,
2783 expect_guest: Expect,
2784 expect_user: Expect,
2785 ) {
2786 use pylon_kernel::{AppManifest, ManifestPolicy, MANIFEST_VERSION};
2787 let anon = AuthContext::anonymous();
2788 let guest = AuthContext::guest("guest-1".into());
2789 let user = AuthContext::authenticated("u-1".into());
2790
2791 let manifest = AppManifest {
2792 manifest_version: MANIFEST_VERSION,
2793 name: "test".into(),
2794 version: "0.1.0".into(),
2795 entities: vec![],
2796 routes: vec![],
2797 queries: vec![],
2798 actions: vec![],
2799 policies: vec![ManifestPolicy {
2800 name: "denyAll".into(),
2801 entity: Some(deny_entity.into()),
2802 allow_read: Some("false".into()),
2803 allow_update: Some("false".into()),
2804 ..Default::default()
2805 }],
2806 auth: Default::default(),
2807 };
2808 let store = StubDataStore {
2809 manifest: manifest.clone(),
2810 };
2811 let session_store = SessionStore::new();
2812 let magic_codes = MagicCodeStore::new();
2813 let oauth_state = OAuthStateStore::new();
2814 let account_store = pylon_auth::AccountStore::new();
2815 let api_keys = pylon_auth::api_key::ApiKeyStore::new();
2816 let orgs = pylon_auth::org::OrgStore::new();
2817 let siwe = pylon_auth::siwe::NonceStore::new();
2818 let phone_codes = pylon_auth::phone::PhoneCodeStore::new();
2819 let passkeys = pylon_auth::webauthn::PasskeyStore::new();
2820 let verification = pylon_auth::verification::VerificationStore::new();
2821 let audit = pylon_auth::audit::AuditStore::new();
2822 let policy_engine = PolicyEngine::from_manifest(&manifest);
2823 let change_log = ChangeLog::new();
2824 let notifier = NoopNotifier;
2825 let rooms = StubRooms;
2826 let cache = StubCache;
2827 let pubsub = StubPubSub;
2828 let jobs = StubJobs;
2829 let scheduler = StubScheduler;
2830 let workflows = StubWorkflows;
2831 let files = StubFiles;
2832 let openapi = StubOpenApi;
2833 let email = StubEmail;
2834 let cookie_config = CookieConfig::from_env(&CookieConfig::default_name_for("test"));
2835
2836 for (auth, want, who) in [
2837 (&anon, expect_anon, "anon"),
2838 (&guest, expect_guest, "guest"),
2839 (&user, expect_user, "user"),
2840 ] {
2841 let ctx = RouterContext {
2842 store: &store,
2843 session_store: &session_store,
2844 magic_codes: &magic_codes,
2845 oauth_state: &oauth_state,
2846 account_store: &account_store,
2847 api_keys: &api_keys,
2848 orgs: &orgs,
2849 siwe: &siwe,
2850 phone_codes: &phone_codes,
2851 passkeys: &passkeys,
2852 verification: &verification,
2853 audit: &audit,
2854 trusted_origins: &[],
2855 policy_engine: &policy_engine,
2856 change_log: &change_log,
2857 notifier: ¬ifier,
2858 rooms: &rooms,
2859 cache: &cache,
2860 pubsub: &pubsub,
2861 jobs: &jobs,
2862 scheduler: &scheduler,
2863 workflows: &workflows,
2864 files: &files,
2865 openapi: &openapi,
2866 functions: None,
2867 email: &email,
2868 shards: None,
2869 plugin_hooks: &NoopPluginHooks,
2870 auth_ctx: auth,
2871 is_dev: false,
2872 request_headers: &[],
2873 peer_ip: "127.0.0.1",
2874 cookie_config: &cookie_config,
2875 response_headers: RefCell::new(Vec::new()),
2876 };
2877 let (status, _body, _ct) = route(&ctx, method, url, body, None);
2878 assert_expect(status, want, &format!("{who} {method:?} {url}"));
2879 }
2880 }
2881
2882 #[test]
2883 fn matrix_cache_admin_only() {
2884 matrix_check(
2885 HttpMethod::Get,
2886 "/api/cache/anykey",
2887 "",
2888 Expect::Rejected,
2889 Expect::Rejected,
2890 Expect::Rejected,
2891 );
2892 matrix_check(
2893 HttpMethod::Post,
2894 "/api/cache",
2895 r#"{"op":"get","key":"x"}"#,
2896 Expect::Rejected,
2897 Expect::Rejected,
2898 Expect::Rejected,
2899 );
2900 matrix_check(
2901 HttpMethod::Delete,
2902 "/api/cache/anykey",
2903 "",
2904 Expect::Rejected,
2905 Expect::Rejected,
2906 Expect::Rejected,
2907 );
2908 }
2909
2910 #[test]
2911 fn matrix_pubsub_admin_only() {
2912 matrix_check(
2913 HttpMethod::Post,
2914 "/api/pubsub/publish",
2915 r#"{"channel":"x","message":"y"}"#,
2916 Expect::Rejected,
2917 Expect::Rejected,
2918 Expect::Rejected,
2919 );
2920 matrix_check(
2921 HttpMethod::Get,
2922 "/api/pubsub/channels",
2923 "",
2924 Expect::Rejected,
2925 Expect::Rejected,
2926 Expect::Rejected,
2927 );
2928 matrix_check(
2929 HttpMethod::Get,
2930 "/api/pubsub/history/some-channel",
2931 "",
2932 Expect::Rejected,
2933 Expect::Rejected,
2934 Expect::Rejected,
2935 );
2936 }
2937
2938 #[test]
2939 fn matrix_jobs_read_admin_only() {
2940 matrix_check(
2941 HttpMethod::Get,
2942 "/api/jobs/stats",
2943 "",
2944 Expect::Rejected,
2945 Expect::Rejected,
2946 Expect::Rejected,
2947 );
2948 matrix_check(
2949 HttpMethod::Get,
2950 "/api/jobs/dead",
2951 "",
2952 Expect::Rejected,
2953 Expect::Rejected,
2954 Expect::Rejected,
2955 );
2956 matrix_check(
2957 HttpMethod::Get,
2958 "/api/jobs",
2959 "",
2960 Expect::Rejected,
2961 Expect::Rejected,
2962 Expect::Rejected,
2963 );
2964 matrix_check(
2965 HttpMethod::Get,
2966 "/api/jobs/some-job-id",
2967 "",
2968 Expect::Rejected,
2969 Expect::Rejected,
2970 Expect::Rejected,
2971 );
2972 }
2973
2974 #[test]
2975 fn matrix_workflows_read_admin_only() {
2976 matrix_check(
2977 HttpMethod::Get,
2978 "/api/workflows/definitions",
2979 "",
2980 Expect::Rejected,
2981 Expect::Rejected,
2982 Expect::Rejected,
2983 );
2984 matrix_check(
2985 HttpMethod::Get,
2986 "/api/workflows",
2987 "",
2988 Expect::Rejected,
2989 Expect::Rejected,
2990 Expect::Rejected,
2991 );
2992 matrix_check(
2993 HttpMethod::Get,
2994 "/api/workflows/some-id",
2995 "",
2996 Expect::Rejected,
2997 Expect::Rejected,
2998 Expect::Rejected,
2999 );
3000 }
3001
3002 #[test]
3003 fn matrix_files_download_requires_auth() {
3004 matrix_check(
3007 HttpMethod::Get,
3008 "/api/files/some-file-id",
3009 "",
3010 Expect::Eq(401),
3011 Expect::ReachedHandler,
3012 Expect::ReachedHandler,
3013 );
3014 }
3015
3016 #[test]
3017 fn matrix_crdt_push_respects_update_policy() {
3018 matrix_check_with_deny_policy(
3023 "Doc",
3024 HttpMethod::Post,
3025 "/api/crdt/Doc/some-row",
3026 r#"{"update":"00"}"#,
3027 Expect::Eq(401),
3028 Expect::Rejected,
3029 Expect::Rejected,
3030 );
3031 }
3032
3033 #[test]
3034 fn matrix_filtered_query_respects_read_policy() {
3035 matrix_check_with_deny_policy(
3036 "Secret",
3037 HttpMethod::Post,
3038 "/api/query/Secret",
3039 r#"{"where":{}}"#,
3040 Expect::Rejected,
3041 Expect::Rejected,
3042 Expect::Rejected,
3043 );
3044 }
3045
3046 #[test]
3047 fn matrix_cursor_pagination_respects_read_policy() {
3048 matrix_check_with_deny_policy(
3049 "Secret",
3050 HttpMethod::Get,
3051 "/api/entities/Secret/cursor?limit=10",
3052 "",
3053 Expect::Rejected,
3054 Expect::Rejected,
3055 Expect::Rejected,
3056 );
3057 }
3058
3059 #[test]
3072 fn matrix_admin_get_routes_audit() {
3073 let admin_get_routes: &[(&str, &str)] = &[
3074 ("/api/scheduler", "list scheduled tasks"),
3075 ("/api/fn", "enumerate registered functions"),
3076 ("/api/fn/traces", "function execution traces"),
3077 ("/api/shards", "shard topology + subscriber counts"),
3078 ("/api/cache/anykey", "raw cache read"),
3079 ("/api/pubsub/channels", "list pub/sub channels"),
3080 ("/api/pubsub/history/anychannel", "channel retained history"),
3081 ("/api/jobs/stats", "job queue stats"),
3082 ("/api/jobs/dead", "dead-letter queue"),
3083 ("/api/jobs", "job list with payloads"),
3084 ("/api/jobs/some-id", "single job detail"),
3085 ("/api/workflows/definitions", "workflow definitions"),
3086 ("/api/workflows", "workflow instance list"),
3087 ("/api/workflows/some-id", "workflow instance detail"),
3088 ];
3089 for (url, label) in admin_get_routes {
3090 matrix_check(
3091 HttpMethod::Get,
3092 url,
3093 "",
3094 Expect::Rejected,
3095 Expect::Rejected,
3096 Expect::Rejected,
3097 );
3098 let _ = label;
3101 }
3102 }
3103
3104 #[allow(dead_code)]
3106 const _TOUCH_ATOMIC_BOOL: AtomicBool = AtomicBool::new(false);
3107
3108 #[test]
3113 fn redact_email_keeps_two_chars_and_domain() {
3114 assert_eq!(super::redact_email("alice@acme.com"), "al***@acme.com");
3115 assert_eq!(super::redact_email("a@b.io"), "a***@b.io");
3116 assert_eq!(super::redact_email("ab@x.io"), "ab***@x.io");
3117 assert_eq!(super::redact_email("not-an-email"), "***");
3119 assert_eq!(super::redact_email(""), "***");
3120 assert_eq!(super::redact_email("éric@x.io"), "ér***@x.io");
3122 }
3123
3124 #[test]
3125 fn public_manifest_strips_policy_expressions() {
3126 use pylon_kernel::{AppManifest, ManifestPolicy, MANIFEST_VERSION};
3127 let m = AppManifest {
3128 manifest_version: MANIFEST_VERSION,
3129 name: "t".into(),
3130 version: "0.0.0".into(),
3131 entities: vec![],
3132 routes: vec![],
3133 queries: vec![],
3134 actions: vec![],
3135 policies: vec![ManifestPolicy {
3136 name: "ownerOnly".into(),
3137 entity: Some("Todo".into()),
3138 allow_read: Some("auth.userId == data.ownerId".into()),
3139 allow_update: Some("auth.userId == data.ownerId".into()),
3140 ..Default::default()
3141 }],
3142 auth: Default::default(),
3143 };
3144 let pub_m = super::public_manifest(&m);
3145 let p = &pub_m.policies[0];
3146 assert_eq!(p.name, "ownerOnly");
3149 assert_eq!(p.entity.as_deref(), Some("Todo"));
3150 assert_eq!(p.allow, "");
3152 assert!(p.allow_read.is_none());
3153 assert!(p.allow_update.is_none());
3154 assert_eq!(
3157 m.policies[0].allow_read.as_deref(),
3158 Some("auth.userId == data.ownerId")
3159 );
3160 }
3161}