Skip to main content

pylon_router/
lib.rs

1//! Platform-agnostic HTTP router for pylon.
2//!
3//! This crate contains the pure routing logic that maps HTTP requests to
4//! data store operations. It has no I/O dependencies — no `tiny_http`,
5//! no `tungstenite`, no `rusqlite`. It works with any [`DataStore`]
6//! implementation (SQLite Runtime, Cloudflare D1, etc.).
7
8use pylon_auth::{AuthContext, CookieConfig, MagicCodeStore, OAuthStateStore, SessionStore};
9use pylon_http::{DataError, DataStore, HttpMethod};
10use pylon_policy::PolicyEngine;
11use pylon_sync::{ChangeKind, ChangeLog};
12use std::cell::RefCell;
13
14mod routes;
15
16// ---------------------------------------------------------------------------
17// ChangeNotifier — abstraction over WS/SSE broadcast
18// ---------------------------------------------------------------------------
19
20/// Receives change notifications for real-time sync.
21///
22/// On the self-hosted server this broadcasts to WebSocket + SSE hubs.
23/// On Workers this can be a no-op or post to a Durable Object.
24pub trait ChangeNotifier: Send + Sync {
25    fn notify(&self, event: &pylon_sync::ChangeEvent);
26    fn notify_presence(&self, json: &str);
27
28    /// Ship a binary CRDT update for one row to subscribed clients.
29    /// Called after every successful write to a CRDT-mode entity. The
30    /// payload is a Loro snapshot (or eventually an incremental delta);
31    /// the implementation owns wire-format framing — see
32    /// `encode_crdt_frame` for the canonical Pylon shape.
33    ///
34    /// Default impl is a no-op so backends without WebSocket support
35    /// (Workers, no-op notifier) compile without ceremony.
36    fn notify_crdt(&self, _entity: &str, _row_id: &str, _snapshot: &[u8]) {}
37}
38
39/// No-op notifier for platforms without real-time push.
40pub struct NoopNotifier;
41
42impl ChangeNotifier for NoopNotifier {
43    fn notify(&self, _event: &pylon_sync::ChangeEvent) {}
44    fn notify_presence(&self, _json: &str) {}
45}
46
47// ---------------------------------------------------------------------------
48// CRDT wire format
49//
50// Every CRDT broadcast frame is a single binary WebSocket message shaped:
51//
52//   [type: u8] [entity_len: u16 BE] [entity utf8] [row_id_len: u16 BE] [row_id utf8] [payload bytes]
53//
54// Type bytes (matching the Remboard pattern that proved out in production):
55//
56//   0x10 = full Loro snapshot (sent on subscribe / first writes)
57//   0x11 = incremental Loro update (sent on subsequent writes)
58//
59// For the first slice we always send 0x10 — Loro's snapshots are bounded
60// by internal compaction so the bandwidth is fine; switching to deltas
61// is a non-breaking optimization (just flip the type byte and the
62// payload encoding) once we have per-client version-vector tracking.
63// ---------------------------------------------------------------------------
64
65/// Frame type for a full CRDT snapshot.
66pub const CRDT_FRAME_SNAPSHOT: u8 = 0x10;
67/// Frame type for an incremental CRDT update (reserved — not yet emitted).
68pub const CRDT_FRAME_UPDATE: u8 = 0x11;
69
70/// Errors from [`encode_crdt_frame`]. Surfaced loud rather than silently
71/// truncating so a pathological entity / row_id name (>64 KiB) becomes
72/// an observable failure instead of a malformed frame the client can't
73/// decode.
74#[derive(Debug, Clone, PartialEq, Eq)]
75pub enum FrameEncodeError {
76    /// Entity name exceeds the 16-bit length header. In practice every
77    /// Pylon entity name is well under 100 bytes — hitting this means
78    /// the caller is using the encoder for something it wasn't designed
79    /// for. The bound is `u16::MAX = 65535` bytes (UTF-8 length).
80    EntityTooLong { len: usize },
81    /// Row ID exceeds the 16-bit length header. Pylon-generated IDs are
82    /// 40 hex chars; user-supplied IDs aren't validated up to this layer
83    /// but are practically bounded by URL / SQL constraints elsewhere.
84    RowIdTooLong { len: usize },
85}
86
87impl std::fmt::Display for FrameEncodeError {
88    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89        match self {
90            Self::EntityTooLong { len } => write!(
91                f,
92                "CRDT frame: entity name {len} bytes exceeds u16 length limit ({})",
93                u16::MAX
94            ),
95            Self::RowIdTooLong { len } => write!(
96                f,
97                "CRDT frame: row_id {len} bytes exceeds u16 length limit ({})",
98                u16::MAX
99            ),
100        }
101    }
102}
103
104impl std::error::Error for FrameEncodeError {}
105
106/// Encode a CRDT broadcast frame. Layout documented at the top of this
107/// module. Returns the encoded bytes on success; errors when entity /
108/// row_id can't fit the 16-bit length header (~65 KiB, never hit in
109/// practice).
110///
111/// Client decoders mirror this in `@pylonsync/loro/src/wire.ts`.
112pub fn encode_crdt_frame(
113    frame_type: u8,
114    entity: &str,
115    row_id: &str,
116    payload: &[u8],
117) -> Result<Vec<u8>, FrameEncodeError> {
118    let entity_bytes = entity.as_bytes();
119    let row_id_bytes = row_id.as_bytes();
120    if entity_bytes.len() > u16::MAX as usize {
121        return Err(FrameEncodeError::EntityTooLong {
122            len: entity_bytes.len(),
123        });
124    }
125    if row_id_bytes.len() > u16::MAX as usize {
126        return Err(FrameEncodeError::RowIdTooLong {
127            len: row_id_bytes.len(),
128        });
129    }
130    let entity_len = entity_bytes.len() as u16;
131    let row_id_len = row_id_bytes.len() as u16;
132    let mut out =
133        Vec::with_capacity(1 + 2 + entity_bytes.len() + 2 + row_id_bytes.len() + payload.len());
134    out.push(frame_type);
135    out.extend_from_slice(&entity_len.to_be_bytes());
136    out.extend_from_slice(entity_bytes);
137    out.extend_from_slice(&row_id_len.to_be_bytes());
138    out.extend_from_slice(row_id_bytes);
139    out.extend_from_slice(payload);
140    Ok(out)
141}
142
143#[cfg(test)]
144mod crdt_frame_tests {
145    use super::*;
146
147    #[test]
148    fn roundtrip_header_layout() {
149        let frame = encode_crdt_frame(
150            CRDT_FRAME_SNAPSHOT,
151            "Message",
152            "msg_123",
153            &[0xab, 0xcd, 0xef],
154        )
155        .unwrap();
156        assert_eq!(frame[0], 0x10);
157        // entity_len = 7 ("Message") in BE
158        assert_eq!(&frame[1..3], &[0, 7]);
159        assert_eq!(&frame[3..10], b"Message");
160        // row_id_len = 7 ("msg_123") in BE
161        assert_eq!(&frame[10..12], &[0, 7]);
162        assert_eq!(&frame[12..19], b"msg_123");
163        // payload trails after both headers
164        assert_eq!(&frame[19..], &[0xab, 0xcd, 0xef]);
165    }
166
167    #[test]
168    fn empty_payload_still_carries_headers() {
169        let frame = encode_crdt_frame(CRDT_FRAME_UPDATE, "X", "y", &[]).unwrap();
170        assert_eq!(frame[0], 0x11);
171        assert_eq!(&frame[1..3], &[0, 1]);
172        assert_eq!(&frame[3..4], b"X");
173        assert_eq!(&frame[4..6], &[0, 1]);
174        assert_eq!(&frame[6..7], b"y");
175        assert_eq!(frame.len(), 7);
176    }
177
178    #[test]
179    fn hex_roundtrip() {
180        assert_eq!(super::decode_hex(""), Some(vec![]));
181        assert_eq!(super::decode_hex("00"), Some(vec![0x00]));
182        assert_eq!(super::decode_hex("ab"), Some(vec![0xab]));
183        assert_eq!(super::decode_hex("AB"), Some(vec![0xab]));
184        assert_eq!(
185            super::decode_hex("DEADBEEF"),
186            Some(vec![0xde, 0xad, 0xbe, 0xef])
187        );
188    }
189
190    #[test]
191    fn hex_rejects_malformed() {
192        assert_eq!(super::decode_hex("a"), None); // odd length
193        assert_eq!(super::decode_hex("xy"), None); // non-hex
194        assert_eq!(super::decode_hex("ab cd"), None); // space inside
195    }
196
197    #[test]
198    fn entity_too_long_errors() {
199        let huge_entity = "x".repeat(u16::MAX as usize + 1);
200        let err = encode_crdt_frame(CRDT_FRAME_SNAPSHOT, &huge_entity, "y", &[])
201            .expect_err("entity > u16::MAX must reject");
202        assert!(matches!(err, FrameEncodeError::EntityTooLong { .. }));
203    }
204
205    #[test]
206    fn row_id_too_long_errors() {
207        let huge_row_id = "x".repeat(u16::MAX as usize + 1);
208        let err = encode_crdt_frame(CRDT_FRAME_SNAPSHOT, "X", &huge_row_id, &[])
209            .expect_err("row_id > u16::MAX must reject");
210        assert!(matches!(err, FrameEncodeError::RowIdTooLong { .. }));
211    }
212}
213
214// ---------------------------------------------------------------------------
215// CacheOps / PubSubOps / JobOps / SchedulerOps / WorkflowOps
216// — thin traits so the router doesn't depend on concrete impls
217// ---------------------------------------------------------------------------
218
219/// Cache operations used by the router.
220pub trait CacheOps: Send + Sync {
221    fn handle_command(&self, body: &str) -> (u16, String);
222    fn handle_get(&self, key: &str) -> (u16, String);
223    fn handle_delete(&self, key: &str) -> (u16, String);
224}
225
226/// Pub/Sub operations used by the router.
227pub trait PubSubOps: Send + Sync {
228    fn handle_publish(&self, body: &str) -> (u16, String);
229    fn handle_channels(&self) -> (u16, String);
230    fn handle_history(&self, channel: &str, url: &str) -> (u16, String);
231}
232
233/// Room operations used by the router.
234pub trait RoomOps: Send + Sync {
235    fn join(
236        &self,
237        room: &str,
238        user_id: &str,
239        data: Option<serde_json::Value>,
240    ) -> Result<(serde_json::Value, serde_json::Value), DataError>;
241    fn leave(&self, room: &str, user_id: &str) -> Option<serde_json::Value>;
242    fn set_presence(
243        &self,
244        room: &str,
245        user_id: &str,
246        data: serde_json::Value,
247    ) -> Option<serde_json::Value>;
248    fn broadcast(
249        &self,
250        room: &str,
251        sender: Option<&str>,
252        topic: &str,
253        data: serde_json::Value,
254    ) -> Option<serde_json::Value>;
255    fn list_rooms(&self) -> Vec<String>;
256    fn room_size(&self, name: &str) -> usize;
257    fn members(&self, name: &str) -> Vec<serde_json::Value>;
258}
259
260/// Job queue operations used by the router.
261pub trait JobOps: Send + Sync {
262    fn enqueue(
263        &self,
264        name: &str,
265        payload: serde_json::Value,
266        priority: &str,
267        delay_secs: u64,
268        max_retries: u32,
269        queue: &str,
270    ) -> String;
271    fn stats(&self) -> serde_json::Value;
272    fn dead_letters(&self) -> serde_json::Value;
273    fn retry_dead(&self, id: &str) -> bool;
274    fn list_jobs(
275        &self,
276        status: Option<&str>,
277        queue: Option<&str>,
278        limit: usize,
279    ) -> serde_json::Value;
280    fn get_job(&self, id: &str) -> Option<serde_json::Value>;
281}
282
283/// Scheduler operations used by the router.
284pub trait SchedulerOps: Send + Sync {
285    fn list_tasks(&self) -> serde_json::Value;
286    fn trigger(&self, name: &str) -> bool;
287}
288
289/// Workflow engine operations used by the router.
290pub trait WorkflowOps: Send + Sync {
291    fn definitions(&self) -> serde_json::Value;
292    fn start(&self, name: &str, input: serde_json::Value) -> Result<String, String>;
293    fn list(&self, status_filter: Option<&str>) -> serde_json::Value;
294    fn get(&self, id: &str) -> Option<serde_json::Value>;
295    fn advance(&self, id: &str) -> Result<String, String>;
296    fn send_event(&self, id: &str, event: &str, data: serde_json::Value) -> Result<(), String>;
297    fn cancel(&self, id: &str) -> Result<(), String>;
298}
299
300/// File storage operations used by the router.
301pub trait FileOps: Send + Sync {
302    fn upload(&self, body: &str) -> (u16, String);
303    fn get_file(&self, id: &str) -> (u16, String);
304}
305
306/// Sends emails (magic codes, invitations, etc.).
307pub trait EmailSender: Send + Sync {
308    fn send(&self, to: &str, subject: &str, body: &str) -> Result<(), String>;
309}
310
311/// Access to sharded real-time simulations (game matches, MMO zones, etc.).
312pub trait ShardOps: Send + Sync {
313    /// Look up an existing shard by ID.
314    fn get_shard(&self, id: &str) -> Option<std::sync::Arc<dyn pylon_realtime::DynShard>>;
315
316    /// List IDs of active shards.
317    fn list_shards(&self) -> Vec<String>;
318
319    /// Number of active shards.
320    fn shard_count(&self) -> usize;
321}
322
323/// Generates the OpenAPI spec JSON string for the manifest.
324pub trait OpenApiGenerator: Send + Sync {
325    fn generate(&self, base_url: &str) -> String;
326}
327
328/// Plugin CRUD lifecycle hooks used by the router.
329///
330/// Wired up so that `POST/PATCH/DELETE /api/entities/...` triggers registered
331/// plugin before_/after_ hooks (audit log, search indexing, webhooks,
332/// versioning, validation, timestamps, slugify). Previously the router only
333/// ran `on_request` + custom routes, silently bypassing CRUD hooks — which
334/// meant security-relevant plugins (validation, audit_log) didn't apply to
335/// the primary write path.
336///
337/// `before_insert/update` receive a mutable `data` so plugins can inject
338/// fields (timestamps, slugs). A returned `Err` rejects the write with the
339/// given status + message and no data is touched.
340pub trait PluginHookOps: Send + Sync {
341    fn before_insert(
342        &self,
343        entity: &str,
344        data: &mut serde_json::Value,
345        auth: &AuthContext,
346    ) -> Result<(), (u16, String, String)>;
347
348    fn after_insert(&self, entity: &str, id: &str, data: &serde_json::Value, auth: &AuthContext);
349
350    fn before_update(
351        &self,
352        entity: &str,
353        id: &str,
354        data: &mut serde_json::Value,
355        auth: &AuthContext,
356    ) -> Result<(), (u16, String, String)>;
357
358    fn after_update(&self, entity: &str, id: &str, data: &serde_json::Value, auth: &AuthContext);
359
360    fn before_delete(
361        &self,
362        entity: &str,
363        id: &str,
364        auth: &AuthContext,
365    ) -> Result<(), (u16, String, String)>;
366
367    fn after_delete(&self, entity: &str, id: &str, auth: &AuthContext);
368}
369
370/// No-op plugin hooks for platforms or tests without a registry.
371pub struct NoopPluginHooks;
372
373impl PluginHookOps for NoopPluginHooks {
374    fn before_insert(
375        &self,
376        _entity: &str,
377        _data: &mut serde_json::Value,
378        _auth: &AuthContext,
379    ) -> Result<(), (u16, String, String)> {
380        Ok(())
381    }
382    fn after_insert(
383        &self,
384        _entity: &str,
385        _id: &str,
386        _data: &serde_json::Value,
387        _auth: &AuthContext,
388    ) {
389    }
390    fn before_update(
391        &self,
392        _entity: &str,
393        _id: &str,
394        _data: &mut serde_json::Value,
395        _auth: &AuthContext,
396    ) -> Result<(), (u16, String, String)> {
397        Ok(())
398    }
399    fn after_update(
400        &self,
401        _entity: &str,
402        _id: &str,
403        _data: &serde_json::Value,
404        _auth: &AuthContext,
405    ) {
406    }
407    fn before_delete(
408        &self,
409        _entity: &str,
410        _id: &str,
411        _auth: &AuthContext,
412    ) -> Result<(), (u16, String, String)> {
413        Ok(())
414    }
415    fn after_delete(&self, _entity: &str, _id: &str, _auth: &AuthContext) {}
416}
417
418/// TypeScript function operations used by the router.
419///
420/// Implementations manage transaction semantics: mutations run under the
421/// write lock with BEGIN/COMMIT/ROLLBACK, queries use the read pool,
422/// actions run without transactions.
423pub trait FnOps: Send + Sync {
424    /// Look up a registered function.
425    fn get_fn(&self, name: &str) -> Option<pylon_functions::registry::FnDef>;
426
427    /// List all registered functions.
428    fn list_fns(&self) -> Vec<pylon_functions::registry::FnDef>;
429
430    /// Execute a function. For streaming responses, `on_stream` is called for
431    /// each chunk as it arrives from the function handler.
432    ///
433    /// `request` is populated when the function is invoked via a custom HTTP
434    /// route (`defineRoute` binding). It carries the raw request metadata
435    /// (method, path, headers, body bytes) so actions can verify webhook
436    /// signatures. Pass `None` for programmatic invocations (runAction,
437    /// scheduled jobs, admin dashboard).
438    fn call(
439        &self,
440        fn_name: &str,
441        args: serde_json::Value,
442        auth: pylon_functions::protocol::AuthInfo,
443        on_stream: Option<Box<dyn FnMut(&str) + Send>>,
444        request: Option<pylon_functions::protocol::RequestInfo>,
445    ) -> Result<
446        (serde_json::Value, pylon_functions::trace::FnTrace),
447        pylon_functions::runner::FnCallError,
448    >;
449
450    /// Recent traces for observability (newest first).
451    fn recent_traces(&self, limit: usize) -> Vec<pylon_functions::trace::FnTrace>;
452
453    /// Check whether the caller is allowed to invoke this function right now.
454    ///
455    /// Returns `Ok(())` if allowed, or `Err(retry_after_secs)` if the caller
456    /// is over the per-function quota. Default impl is permissive — backends
457    /// that don't enforce per-function limits don't need to implement it.
458    /// `identity` is a stable string for the caller (user id, session id, or
459    /// IP) used as the rate-limit key.
460    fn check_rate_limit(&self, _fn_name: &str, _identity: &str) -> Result<(), u64> {
461        Ok(())
462    }
463}
464
465// ---------------------------------------------------------------------------
466// RouterContext — bundles all dependencies for a single request
467// ---------------------------------------------------------------------------
468
469pub struct RouterContext<'a> {
470    pub store: &'a dyn DataStore,
471    pub session_store: &'a SessionStore,
472    pub magic_codes: &'a MagicCodeStore,
473    pub oauth_state: &'a OAuthStateStore,
474    /// Persistent OAuth account links — better-auth's `account` table
475    /// equivalent. Used by the OAuth callback to look up + upsert the
476    /// `(provider, provider_account_id) → user_id` mapping plus the
477    /// access/refresh token bundle.
478    pub account_store: &'a pylon_auth::AccountStore,
479    /// Long-lived API keys — `pk.key_<id>.<secret>` bearer tokens that
480    /// resolve to a user_id with optional scopes/expiry. Created via
481    /// `POST /api/auth/api-keys`, listed/revoked from the same path.
482    pub api_keys: &'a pylon_auth::api_key::ApiKeyStore,
483    /// Organizations + memberships + invites — multi-tenant team
484    /// management. Endpoints under `/api/auth/orgs/...`.
485    pub orgs: &'a pylon_auth::org::OrgStore,
486    /// Per-address pending SIWE nonces. Issued at
487    /// `/api/auth/siwe/nonce`, consumed at `/api/auth/siwe/verify`.
488    pub siwe: &'a pylon_auth::siwe::NonceStore,
489    /// Phone-number magic codes. Endpoints under `/api/auth/phone/...`.
490    pub phone_codes: &'a pylon_auth::phone::PhoneCodeStore,
491    /// WebAuthn / passkey credentials + per-user challenge stash.
492    /// Endpoints under `/api/auth/passkey/...`.
493    pub passkeys: &'a pylon_auth::webauthn::PasskeyStore,
494    /// Single-use email-delivered tokens (password reset, email
495    /// change, magic-link sign-in). Endpoints under
496    /// `/api/auth/{password/reset,email/change,magic-link}/...`.
497    pub verification: &'a pylon_auth::verification::VerificationStore,
498    /// Append-only audit log for security-relevant events. Endpoints
499    /// `/api/auth/audit` (current user) + `/api/auth/audit/tenant`
500    /// (active tenant; admin-gated by your policy layer).
501    pub audit: &'a pylon_auth::audit::AuditStore,
502    pub policy_engine: &'a PolicyEngine,
503    pub change_log: &'a ChangeLog,
504    pub notifier: &'a dyn ChangeNotifier,
505    pub rooms: &'a dyn RoomOps,
506    pub cache: &'a dyn CacheOps,
507    pub pubsub: &'a dyn PubSubOps,
508    pub jobs: &'a dyn JobOps,
509    pub scheduler: &'a dyn SchedulerOps,
510    pub workflows: &'a dyn WorkflowOps,
511    pub files: &'a dyn FileOps,
512    pub openapi: &'a dyn OpenApiGenerator,
513    pub functions: Option<&'a dyn FnOps>,
514    pub email: &'a dyn EmailSender,
515    pub shards: Option<&'a dyn ShardOps>,
516    pub plugin_hooks: &'a dyn PluginHookOps,
517    pub auth_ctx: &'a AuthContext,
518    /// Allowlist of origins (`scheme://host[:port]`) that the OAuth
519    /// start endpoint will accept as `?callback=` / `?error_callback=`
520    /// targets. Sourced from `PYLON_TRUSTED_ORIGINS` (comma-separated)
521    /// at server boot. Borrowed from better-auth's `trustedOrigins`
522    /// model — explicit allowlist, no implicit "same-origin trust" or
523    /// env-var magic. Open redirects via OAuth are an easy bug to
524    /// ship by accident; this list is the only thing standing between
525    /// a misconfigured frontend and an attacker-controlled redirect.
526    pub trusted_origins: &'a [String],
527    pub is_dev: bool,
528    /// Raw HTTP request headers (lowercased names). Used by the webhook
529    /// action endpoint to pass the exact signing-relevant headers through
530    /// to TypeScript actions. Empty slice on platforms that don't forward
531    /// headers (e.g. internal calls).
532    pub request_headers: &'a [(String, String)],
533    /// Client IP as the runtime resolved it from the socket. Used as
534    /// the rate-limit bucket key for unauthenticated callers — the
535    /// alternative ("anon" string) puts every unauth request worldwide
536    /// into one shared bucket, which lets one attacker starve every
537    /// other anonymous caller. Empty string on platforms that don't
538    /// expose a peer address.
539    pub peer_ip: &'a str,
540    /// Session cookie shape (name, domain, attrs). Handlers use this to
541    /// emit Set-Cookie headers via [`RouterContext::add_response_header`]
542    /// when they want a browser-bound session.
543    pub cookie_config: &'a CookieConfig,
544    /// Extra response headers handlers want to attach (e.g. Set-Cookie,
545    /// Location). The runtime drains this after `route()` returns and
546    /// merges them into the outgoing response. Interior mutability so
547    /// handlers don't need a `&mut` ctx.
548    pub response_headers: RefCell<Vec<(String, String)>>,
549}
550
551impl<'a> RouterContext<'a> {
552    /// Queue a header to be added to the response built from this request.
553    pub fn add_response_header(&self, name: impl Into<String>, value: impl Into<String>) {
554        self.response_headers
555            .borrow_mut()
556            .push((name.into(), value.into()));
557    }
558
559    /// Drain the queued response headers. Runtime calls this once after
560    /// `route()` returns, before constructing the wire response.
561    pub fn take_response_headers(&self) -> Vec<(String, String)> {
562        std::mem::take(&mut *self.response_headers.borrow_mut())
563    }
564
565    /// Read the request's `Origin` header, if any. Browsers always send
566    /// Origin on cross-origin XHR/fetch and on POSTs; non-browser
567    /// callers (CLI, server-to-server) typically don't.
568    pub fn request_origin(&self) -> Option<&str> {
569        self.request_headers
570            .iter()
571            .find(|(k, _)| k.eq_ignore_ascii_case("origin"))
572            .map(|(_, v)| v.as_str())
573    }
574
575    /// Emit a session cookie when the request looks like it came from a
576    /// browser (i.e. carries Origin). Non-browser callers still receive
577    /// the JSON token in the body and ignore the missing cookie.
578    /// Origin allowlisting is enforced at the runtime CSRF layer for
579    /// state-changing methods, so handlers don't need to re-check here.
580    pub fn maybe_set_session_cookie(&self, token: &str) {
581        if self.request_origin().is_some() {
582            self.add_response_header("Set-Cookie", self.cookie_config.set_value(token));
583        }
584    }
585}
586
587// ---------------------------------------------------------------------------
588// OAuth callback shared logic (POST returns JSON, GET 302s with cookie)
589// ---------------------------------------------------------------------------
590
591pub(crate) struct OAuthError {
592    pub(crate) status: u16,
593    pub(crate) code: &'static str,
594    pub(crate) message: String,
595}
596
597/// Shared OAuth code-for-session exchange. Returns the user_id + minted
598/// Truncate (and elide) error strings before they end up in
599/// `oauth_error_message` redirect URLs. Provider error bodies can be
600/// huge or contain echoed-back request fields — keep the redirect
601/// short and safe to log.
602///
603/// MAX is the budget for the *output*, including the ellipsis (3
604/// bytes), so the slice itself caps at MAX-3.
605fn truncate_for_redirect(s: &str) -> String {
606    const MAX: usize = 240;
607    if s.len() <= MAX {
608        return s.to_string();
609    }
610    let budget = MAX - "…".len();
611    let mut end = budget;
612    while end > 0 && !s.is_char_boundary(end) {
613        end -= 1;
614    }
615    format!("{}…", &s[..end])
616}
617
618/// session, or a structured error suitable for both JSON (POST) and
619/// 302-redirect-with-error-param (GET) responses.
620/// Complete an OAuth login. Caller must have ALREADY validated the
621/// state token via `ctx.oauth_state.validate(...)` and is passing the
622/// resulting `OAuthState` record in via... well, by virtue of having
623/// called this function. State validation lives at the call site (not
624/// here) because the GET /api/auth/callback/:provider handler needs
625/// the validated record's callback URLs to know where to redirect on
626/// both success and failure — and validate is single-use, so it can
627/// only be called once per token.
628pub(crate) fn complete_oauth_login_pkce(
629    ctx: &RouterContext,
630    provider: &str,
631    code: Option<&str>,
632    pkce_verifier: Option<&str>,
633    dev_email: Option<&str>,
634    dev_name: Option<&str>,
635) -> Result<(String, pylon_auth::Session), OAuthError> {
636    let (userinfo, tokens) = if let Some(code) = code {
637        let registry = pylon_auth::OAuthRegistry::shared();
638        let config = registry.get(provider).cloned().ok_or_else(|| OAuthError {
639            status: 404,
640            code: "PROVIDER_NOT_FOUND",
641            message: format!("OAuth provider \"{provider}\" not configured"),
642        })?;
643        let tokens = config
644            .exchange_code_full_pkce(code, pkce_verifier)
645            .map_err(|err| OAuthError {
646                status: 502,
647                code: "OAUTH_TOKEN_EXCHANGE_FAILED",
648                // Sanitize: providers like to echo back the request body
649                // on auth failures. The auth layer already redacts known
650                // sensitive fields, but cap the length to keep stray
651                // tokens out of redirect URLs.
652                message: truncate_for_redirect(&format!("token exchange failed: {err}")),
653            })?;
654        // Apple identity lives in id_token, not a userinfo endpoint.
655        // Pass both — the auth layer routes by spec.
656        let info = config
657            .fetch_userinfo_with_id_token(&tokens.access_token, tokens.id_token.as_deref())
658            .map_err(|err| OAuthError {
659                status: 502,
660                code: "OAUTH_TOKEN_EXCHANGE_FAILED",
661                message: truncate_for_redirect(&format!("userinfo fetch failed: {err}")),
662            })?;
663        (info, tokens)
664    } else if ctx.is_dev {
665        let email = dev_email.ok_or_else(|| OAuthError {
666            status: 400,
667            code: "MISSING_FIELD",
668            message: "OAuth callback requires `code` (or `email` in dev mode)".into(),
669        })?;
670        // Dev path needs a stable provider_account_id so repeat
671        // sign-ins land on the same Account row. Use the email itself
672        // — predictable for tests, and a real provider would never
673        // reuse an email as a sub.
674        let info = pylon_auth::UserInfo {
675            provider: provider.to_string(),
676            provider_account_id: format!("dev:{email}"),
677            email: email.to_string(),
678            name: dev_name.map(String::from),
679        };
680        let tokens = pylon_auth::TokenSet {
681            access_token: "dev_access_token".into(),
682            refresh_token: None,
683            id_token: None,
684            expires_at: None,
685            scope: None,
686        };
687        (info, tokens)
688    } else {
689        return Err(OAuthError {
690            status: 400,
691            code: "MISSING_FIELD",
692            message: "OAuth callback requires an authorization `code` from the provider".into(),
693        });
694    };
695
696    // Real-world bug this replaces: the previous formatter produced
697    // strings like "1761811234Z" (epoch-seconds with a stray Z) that
698    // SQLite happily stored as TEXT but PostgreSQL rejected as
699    // invalid TIMESTAMPTZ — every Google sign-up against pylon-cloud
700    // failed with USER_CREATE_FAILED. Use the kernel's ISO 8601
701    // formatter for a value both backends parse cleanly.
702    let now = chrono_now_iso();
703
704    // Resolve user_id in priority order:
705    //   1. Existing account link by (provider, provider_account_id) — the
706    //      stable identity. Survives email changes on the provider side.
707    //   2. Existing User row by email — account-linking-by-email. The
708    //      classic "you signed up with email/password and now you're
709    //      adding Google" flow.
710    //   3. Create a new User.
711    //
712    // Crucially: every step that can fail (store.insert, store.update)
713    // returns its error rather than silently using the email as user_id.
714    // That swallow caused the "session for nonexistent user" bug — the
715    // OAuth flow looked successful but the User row was never created
716    // and /api/auth/me would resolve to a phantom identity.
717    let user_id = if let Some(existing) = ctx
718        .account_store
719        .find_by_provider(provider, &userinfo.provider_account_id)
720    {
721        // Returning user via the same provider — refresh the token
722        // bundle and reuse the linked user_id.
723        let mut refreshed = pylon_auth::Account::new(existing.user_id.clone(), &userinfo, &tokens);
724        refreshed.created_at = existing.created_at;
725        ctx.account_store.upsert(&refreshed);
726        existing.user_id
727    } else if let Ok(Some(row)) = ctx.store.lookup(
728        &ctx.store.manifest().auth.user.entity,
729        "email",
730        &userinfo.email,
731    ) {
732        // First-time link of this provider to an existing user (matched
733        // by email). Stamp emailVerified opportunistically since the
734        // provider just vouched for the address.
735        let id = row["id"].as_str().unwrap_or("").to_string();
736        if id.is_empty() {
737            return Err(OAuthError {
738                status: 500,
739                code: "USER_LOOKUP_INVALID",
740                message: "User row matched by email but had no id field".into(),
741            });
742        }
743        if row.get("emailVerified").map_or(true, |v| v.is_null()) {
744            // Best-effort — schemas without the field silently drop the
745            // update. We do NOT bail on this error since the user
746            // already existed and OAuth still succeeded.
747            let _ = ctx.store.update(
748                &ctx.store.manifest().auth.user.entity,
749                &id,
750                &serde_json::json!({ "emailVerified": now }),
751            );
752        }
753        ctx.account_store
754            .upsert(&pylon_auth::Account::new(id.clone(), &userinfo, &tokens));
755        id
756    } else {
757        // Brand-new user. Create the User row + the Account link. Both
758        // fail loudly — a silent failure here is what produced the
759        // "session for nonexistent user" bug.
760        let display_name = userinfo.name.as_deref().unwrap_or(&userinfo.email);
761        let user_entity = ctx.store.manifest().auth.user.entity.clone();
762        let id = ctx
763            .store
764            .insert(
765                &user_entity,
766                &serde_json::json!({
767                    "email": userinfo.email,
768                    "displayName": display_name,
769                    "emailVerified": now,
770                    "createdAt": now,
771                }),
772            )
773            .map_err(|e| OAuthError {
774                status: 500,
775                code: "USER_CREATE_FAILED",
776                // Preserve the full upstream code/message — a failed insert
777                // is almost always "the User entity in your manifest has
778                // a field this OAuth handler doesn't set" (NOT NULL
779                // violation), and the operator needs to see exactly which
780                // column.
781                message: format!(
782                    "failed to create User row for OAuth signup ({}): {}",
783                    e.code, e.message
784                ),
785            })?;
786        ctx.account_store
787            .upsert(&pylon_auth::Account::new(id.clone(), &userinfo, &tokens));
788        id
789    };
790    let session = ctx.session_store.create(user_id.clone());
791    Ok((user_id, session))
792}
793
794/// Parse a `key=value&key=value` query string into a map. Uses
795/// `query_decode` (NOT form_decode) — RFC 3986 says `+` is a literal
796/// in URI query strings; only `application/x-www-form-urlencoded`
797/// bodies decode `+` as space. OAuth state tokens that happen to
798/// contain `+` (e.g. base64-with-padding) round-trip cleanly here.
799pub(crate) fn parse_query(q: &str) -> std::collections::HashMap<String, String> {
800    let mut out = std::collections::HashMap::new();
801    for pair in q.split('&') {
802        if pair.is_empty() {
803            continue;
804        }
805        let (k, v) = pair.split_once('=').unwrap_or((pair, ""));
806        out.insert(query_decode(k), query_decode(v));
807    }
808    out
809}
810
811/// Percent-decode a URI query-string segment. Treats `+` as a literal
812/// `+` character (per RFC 3986 §3.4) — the `+` → space convention
813/// only applies to `application/x-www-form-urlencoded` *bodies*, not
814/// to URI query strings. Inlined `percent_decode` because the form-
815/// body variant isn't used here.
816fn query_decode(s: &str) -> String {
817    percent_decode(s, false)
818}
819
820#[allow(dead_code)]
821fn percent_decode(s: &str, plus_is_space: bool) -> String {
822    let bytes = s.as_bytes();
823    let mut out = Vec::with_capacity(bytes.len());
824    let mut i = 0;
825    while i < bytes.len() {
826        match bytes[i] {
827            b'+' if plus_is_space => {
828                out.push(b' ');
829                i += 1;
830            }
831            b'%' if i + 2 < bytes.len() => {
832                let hex = std::str::from_utf8(&bytes[i + 1..i + 3]).unwrap_or("");
833                match u8::from_str_radix(hex, 16) {
834                    Ok(b) => {
835                        out.push(b);
836                        i += 3;
837                    }
838                    Err(_) => {
839                        out.push(bytes[i]);
840                        i += 1;
841                    }
842                }
843            }
844            b => {
845                out.push(b);
846                i += 1;
847            }
848        }
849    }
850    String::from_utf8_lossy(&out).into_owned()
851}
852
853/// Redact an email for logging — keeps the first two characters of
854/// the local-part + the domain, masks the rest. `alice@acme.com`
855/// becomes `al***@acme.com`. Compliance-friendly (no full PII in
856/// operator log aggregators) without losing all debuggability.
857pub(crate) fn redact_email(email: &str) -> String {
858    match email.find('@') {
859        Some(at) => {
860            let (user, domain) = email.split_at(at);
861            let prefix_len = user.len().min(2);
862            let prefix: String = user.chars().take(prefix_len).collect();
863            format!("{prefix}***{domain}")
864        }
865        None => "***".to_string(),
866    }
867}
868
869/// Build a redacted view of the manifest safe to serve to anonymous
870/// callers. Drops the body of every policy expression — `allow_read`,
871/// `allow_insert`, etc. — but keeps policy name + entity + action so
872/// client tooling can map a "policy denied: ownerReadTodos" error to
873/// the human label without seeing the raw rule.
874fn public_manifest(m: &pylon_kernel::AppManifest) -> pylon_kernel::AppManifest {
875    let mut out = m.clone();
876    for p in out.policies.iter_mut() {
877        p.allow = String::new();
878        p.allow_read = None;
879        p.allow_insert = None;
880        p.allow_update = None;
881        p.allow_delete = None;
882    }
883    out
884}
885
886pub(crate) fn url_encode(s: &str) -> String {
887    let mut out = String::with_capacity(s.len());
888    for b in s.bytes() {
889        match b {
890            b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => {
891                out.push(b as char)
892            }
893            _ => out.push_str(&format!("%{b:02X}")),
894        }
895    }
896    out
897}
898
899// ---------------------------------------------------------------------------
900// route() — the platform-agnostic request router
901// ---------------------------------------------------------------------------
902
903/// Route an HTTP request to the appropriate handler.
904///
905/// Returns `(status_code, response_body, content_type)`.
906pub fn route(
907    ctx: &RouterContext,
908    method: HttpMethod,
909    url: &str,
910    body: &str,
911    auth_token: Option<&str>,
912) -> (u16, String, &'static str) {
913    let (status, body) = route_inner(ctx, method, url, body, auth_token);
914    (status, body, "application/json")
915}
916
917fn route_inner(
918    ctx: &RouterContext,
919    method: HttpMethod,
920    url: &str,
921    body: &str,
922    auth_token: Option<&str>,
923) -> (u16, String) {
924    // CORS preflight
925    if method == HttpMethod::Options {
926        return (204, String::new());
927    }
928
929    // GET /api/manifest
930    // Public manifest. Clients need entity/field/route shapes to call
931    // the API, but they do NOT need raw policy expressions — those are
932    // server-enforcement details, and exposing them ("auth.userId ==
933    // data.ownerId") tells an attacker exactly which condition to
934    // satisfy. Strip allow_* expressions; keep policy NAMES so client
935    // tooling can still surface "denied by ownerReadTodos" errors.
936    // Admins get the full thing for tooling via ?full=1.
937    if url.starts_with("/api/manifest") && method == HttpMethod::Get {
938        let path = url.split('?').next().unwrap_or(url);
939        if path == "/api/manifest" {
940            let want_full = query_param(url, "full")
941                .map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
942                .unwrap_or(false);
943            let manifest = ctx.store.manifest();
944            let body = if want_full && ctx.auth_ctx.is_admin {
945                serde_json::to_string(manifest).unwrap_or_else(|_| "{}".into())
946            } else {
947                serde_json::to_string(&public_manifest(manifest)).unwrap_or_else(|_| "{}".into())
948            };
949            return (200, body);
950        }
951    }
952
953    // GET /api/openapi.json
954    if url == "/api/openapi.json" && method == HttpMethod::Get {
955        return (200, ctx.openapi.generate(""));
956    }
957
958    // -----------------------------------------------------------------------
959    // Auth routes — handled by crates/router/src/routes/auth.rs.
960    // /api/auth/* (sessions, OAuth, magic-link, password, email verify,
961    // /me, /providers, /sessions, refresh).
962    // -----------------------------------------------------------------------
963    if let Some(r) = routes::auth::handle(ctx, method, url, body, auth_token) {
964        return r;
965    }
966
967    // -----------------------------------------------------------------------
968    // Sync + GDPR — handled by crates/router/src/routes/sync.rs.
969    // /api/sync/{pull,push}, /api/admin/users/:id/{export,purge}.
970    // -----------------------------------------------------------------------
971    if let Some(r) = routes::sync::handle(ctx, method, url, body, auth_token) {
972        return r;
973    }
974
975    // -----------------------------------------------------------------------
976    // Rooms — handled by crates/router/src/routes/rooms.rs.
977    // /api/rooms/{join,leave,presence,broadcast}, /api/rooms[/<room>].
978    // -----------------------------------------------------------------------
979    if let Some(r) = routes::rooms::handle(ctx, method, url, body, auth_token) {
980        return r;
981    }
982
983    // -----------------------------------------------------------------------
984    // Link / Files / CRDT — handled by routes/{links,files,crdt}.rs.
985    // /api/{link,unlink}, /api/files/{upload,<id>}, /api/crdt/<entity>/<row>.
986    // -----------------------------------------------------------------------
987    if let Some(r) = routes::links::handle(ctx, method, url, body, auth_token) {
988        return r;
989    }
990    if let Some(r) = routes::files::handle(ctx, method, url, body, auth_token) {
991        return r;
992    }
993    if let Some(r) = routes::crdt::handle(ctx, method, url, body, auth_token) {
994        return r;
995    }
996
997    // -----------------------------------------------------------------------
998    // Queries / Actions / Admin data / Search — handled by:
999    //   routes/queries.rs   (transact, query/:e, lookup, aggregate, query)
1000    //   routes/actions.rs   (/api/actions/<name>)
1001    //   routes/admin_data.rs (export, import)
1002    //   routes/search.rs    (/api/search/<entity>)
1003    // -----------------------------------------------------------------------
1004    if let Some(r) = routes::queries::handle(ctx, method, url, body, auth_token) {
1005        return r;
1006    }
1007    if let Some(r) = routes::actions::handle(ctx, method, url, body, auth_token) {
1008        return r;
1009    }
1010    if let Some(r) = routes::admin_data::handle(ctx, method, url, body, auth_token) {
1011        return r;
1012    }
1013    if let Some(r) = routes::auth_admin::handle(ctx, method, url, body, auth_token) {
1014        return r;
1015    }
1016    if let Some(r) = routes::ops_admin::handle(ctx, method, url, body, auth_token) {
1017        return r;
1018    }
1019    if let Some(r) = routes::search::handle(ctx, method, url, body, auth_token) {
1020        return r;
1021    }
1022
1023    // -----------------------------------------------------------------------
1024    // Entity CRUD + cursor + batch — handled by routes/entities.rs.
1025    // /api/entities/<entity>[/<id>], /api/entities/<entity>/cursor,
1026    // /api/batch.
1027    // -----------------------------------------------------------------------
1028    if let Some(r) = routes::entities::handle(ctx, method, url, body, auth_token) {
1029        return r;
1030    }
1031
1032    // -----------------------------------------------------------------------
1033    // Infra / Functions / Shards / Workflows / AI — handled by:
1034    //   routes/infra.rs       (cache, pubsub, jobs, scheduler)
1035    //   routes/functions.rs   (/api/fn, /api/fn/traces, /api/webhooks/<>)
1036    //   routes/shards.rs      (/api/shards*)
1037    //   routes/workflows.rs   (/api/workflows*)
1038    //   routes/ai.rs          (/api/ai/complete shim — runtime owns stream)
1039    // -----------------------------------------------------------------------
1040    if let Some(r) = routes::infra::handle(ctx, method, url, body, auth_token) {
1041        return r;
1042    }
1043    if let Some(r) = routes::functions::handle(ctx, method, url, body, auth_token) {
1044        return r;
1045    }
1046    if let Some(r) = routes::shards::handle(ctx, method, url, body, auth_token) {
1047        return r;
1048    }
1049    if let Some(r) = routes::workflows::handle(ctx, method, url, body, auth_token) {
1050        return r;
1051    }
1052    if let Some(r) = routes::ai::handle(ctx, method, url, body, auth_token) {
1053        return r;
1054    }
1055
1056    // -----------------------------------------------------------------------
1057    // Fallback
1058    // -----------------------------------------------------------------------
1059
1060    (
1061        404,
1062        json_error_with_hint(
1063            "NOT_FOUND",
1064            &format!("No API route matches {url}"),
1065            "Available endpoints: /api/entities/<entity>, /api/actions/<name>, /api/query, /api/auth/*, /api/sync/*, /api/files/*, /api/cache, /api/pubsub/*, /api/jobs, /api/scheduler, /api/workflows, /api/ai/*, /studio",
1066        ),
1067    )
1068}
1069
1070// ---------------------------------------------------------------------------
1071// Entity CRUD helpers
1072// ---------------------------------------------------------------------------
1073
1074pub(crate) fn handle_list(store: &dyn DataStore, entity: &str, url: &str) -> (u16, String) {
1075    let limit: Option<usize> = url
1076        .split("limit=")
1077        .nth(1)
1078        .and_then(|s| s.split('&').next())
1079        .and_then(|s| s.parse().ok());
1080    let offset: usize = url
1081        .split("offset=")
1082        .nth(1)
1083        .and_then(|s| s.split('&').next())
1084        .and_then(|s| s.parse().ok())
1085        .unwrap_or(0);
1086
1087    // Push limit/offset down to SQL via query_filtered's $limit/$offset.
1088    // Skips full-table scans for large entities.
1089    let mut filter = serde_json::Map::new();
1090    if let Some(l) = limit {
1091        filter.insert("$limit".into(), serde_json::json!(l));
1092    }
1093    if offset > 0 {
1094        filter.insert("$offset".into(), serde_json::json!(offset));
1095    }
1096    let filter = serde_json::Value::Object(filter);
1097
1098    match store.query_filtered(entity, &filter) {
1099        Ok(rows) => {
1100            // For backwards compatibility we keep the response shape but
1101            // `total` here means "rows returned" not "total in table".
1102            // The cursor pagination endpoint at /api/entities/:e/cursor is
1103            // the right path for total counts at scale.
1104            let count = rows.len();
1105            (
1106                200,
1107                serde_json::json!({
1108                    "data": rows,
1109                    "count": count,
1110                    "offset": offset,
1111                    "limit": limit,
1112                })
1113                .to_string(),
1114            )
1115        }
1116        Err(e) => (400, json_error(&e.code, &e.message)),
1117    }
1118}
1119
1120pub(crate) fn handle_get(store: &dyn DataStore, entity: &str, id: &str) -> (u16, String) {
1121    match store.get_by_id(entity, id) {
1122        Ok(Some(row)) => (
1123            200,
1124            serde_json::to_string(&row).unwrap_or_else(|_| "{}".into()),
1125        ),
1126        Ok(None) => (
1127            404,
1128            json_error("NOT_FOUND", &format!("{entity} with id \"{id}\" not found")),
1129        ),
1130        Err(e) => (400, json_error(&e.code, &e.message)),
1131    }
1132}
1133
1134pub(crate) fn handle_insert(ctx: &RouterContext, entity: &str, body: &str) -> (u16, String) {
1135    let mut data: serde_json::Value = match serde_json::from_str(body) {
1136        Ok(v) => v,
1137        Err(e) => {
1138            return (
1139                400,
1140                json_error_safe(
1141                    "INVALID_JSON",
1142                    "Invalid request body",
1143                    &format!("Invalid JSON: {e}"),
1144                ),
1145            )
1146        }
1147    };
1148    // Run plugin `before_insert` hooks. Registered plugins (validation,
1149    // timestamps, slugify) mutate `data` here; a rejected hook aborts
1150    // the write with their status + error payload.
1151    if let Err((status, code, msg)) =
1152        ctx.plugin_hooks
1153            .before_insert(entity, &mut data, ctx.auth_ctx)
1154    {
1155        return (status, json_error(&code, &msg));
1156    }
1157    match ctx.store.insert(entity, &data) {
1158        Ok(id) => {
1159            let seq = ctx
1160                .change_log
1161                .append(entity, &id, ChangeKind::Insert, Some(data.clone()));
1162            broadcast_change_with_crdt(
1163                ctx.notifier,
1164                ctx.store,
1165                seq,
1166                entity,
1167                &id,
1168                ChangeKind::Insert,
1169                Some(&data),
1170            );
1171            ctx.plugin_hooks
1172                .after_insert(entity, &id, &data, ctx.auth_ctx);
1173            (201, serde_json::json!({"id": id}).to_string())
1174        }
1175        Err(e) => (400, json_error(&e.code, &e.message)),
1176    }
1177}
1178
1179pub(crate) fn handle_update(
1180    ctx: &RouterContext,
1181    entity: &str,
1182    id: &str,
1183    body: &str,
1184) -> (u16, String) {
1185    let mut data: serde_json::Value = match serde_json::from_str(body) {
1186        Ok(v) => v,
1187        Err(e) => {
1188            return (
1189                400,
1190                json_error_safe(
1191                    "INVALID_JSON",
1192                    "Invalid request body",
1193                    &format!("Invalid JSON: {e}"),
1194                ),
1195            )
1196        }
1197    };
1198    if let Err((status, code, msg)) =
1199        ctx.plugin_hooks
1200            .before_update(entity, id, &mut data, ctx.auth_ctx)
1201    {
1202        return (status, json_error(&code, &msg));
1203    }
1204    match ctx.store.update(entity, id, &data) {
1205        Ok(true) => {
1206            let seq = ctx
1207                .change_log
1208                .append(entity, id, ChangeKind::Update, Some(data.clone()));
1209            broadcast_change_with_crdt(
1210                ctx.notifier,
1211                ctx.store,
1212                seq,
1213                entity,
1214                id,
1215                ChangeKind::Update,
1216                Some(&data),
1217            );
1218            ctx.plugin_hooks
1219                .after_update(entity, id, &data, ctx.auth_ctx);
1220            (200, serde_json::json!({"updated": true}).to_string())
1221        }
1222        Ok(false) => (
1223            404,
1224            json_error("NOT_FOUND", &format!("{entity}/{id} not found")),
1225        ),
1226        Err(e) => (400, json_error(&e.code, &e.message)),
1227    }
1228}
1229
1230pub(crate) fn handle_delete(ctx: &RouterContext, entity: &str, id: &str) -> (u16, String) {
1231    if let Err((status, code, msg)) = ctx.plugin_hooks.before_delete(entity, id, ctx.auth_ctx) {
1232        return (status, json_error(&code, &msg));
1233    }
1234    match ctx.store.delete(entity, id) {
1235        Ok(true) => {
1236            let seq = ctx.change_log.append(entity, id, ChangeKind::Delete, None);
1237            broadcast_change(ctx.notifier, seq, entity, id, ChangeKind::Delete, None);
1238            ctx.plugin_hooks.after_delete(entity, id, ctx.auth_ctx);
1239            (200, serde_json::json!({"deleted": true}).to_string())
1240        }
1241        Ok(false) => (
1242            404,
1243            json_error("NOT_FOUND", &format!("{entity}/{id} not found")),
1244        ),
1245        Err(e) => (400, json_error(&e.code, &e.message)),
1246    }
1247}
1248
1249// ---------------------------------------------------------------------------
1250// Helpers
1251// ---------------------------------------------------------------------------
1252
1253pub(crate) fn broadcast_change(
1254    notifier: &dyn ChangeNotifier,
1255    seq: u64,
1256    entity: &str,
1257    row_id: &str,
1258    kind: ChangeKind,
1259    data: Option<&serde_json::Value>,
1260) {
1261    let event = pylon_sync::ChangeEvent {
1262        seq,
1263        entity: entity.to_string(),
1264        row_id: row_id.to_string(),
1265        kind,
1266        data: data.cloned(),
1267        timestamp: String::new(),
1268    };
1269    notifier.notify(&event);
1270}
1271
1272/// Convenience: emit BOTH the JSON change event AND the binary CRDT
1273/// snapshot frame after a successful insert/update on a CRDT-mode
1274/// entity. The CRDT snapshot is fetched via [`DataStore::crdt_snapshot`];
1275/// for `crdt: false` entities (the LWW opt-out) it returns `Ok(None)`
1276/// and we skip the binary broadcast cleanly.
1277///
1278/// Delete operations don't ship a CRDT frame — Loro doesn't have a
1279/// "row gone" concept; the JSON change event with `kind: "delete"` is
1280/// the canonical signal. Clients drop the LoroDoc on receipt.
1281pub fn broadcast_change_with_crdt(
1282    notifier: &dyn ChangeNotifier,
1283    store: &dyn DataStore,
1284    seq: u64,
1285    entity: &str,
1286    row_id: &str,
1287    kind: ChangeKind,
1288    data: Option<&serde_json::Value>,
1289) {
1290    broadcast_change(notifier, seq, entity, row_id, kind.clone(), data);
1291    if matches!(kind, ChangeKind::Delete) {
1292        return;
1293    }
1294    if let Ok(Some(snapshot)) = store.crdt_snapshot(entity, row_id) {
1295        notifier.notify_crdt(entity, row_id, &snapshot);
1296    }
1297}
1298
1299/// Tiny lowercase-hex decoder. Returns `None` on any malformed input
1300/// (odd length, non-hex character). Used by the CRDT push endpoint to
1301/// turn the JSON `{update: "<hex>"}` payload back into binary Loro
1302/// bytes without pulling in a base64 dep just for one route.
1303pub(crate) fn decode_hex(s: &str) -> Option<Vec<u8>> {
1304    if s.len() % 2 != 0 {
1305        return None;
1306    }
1307    let mut out = Vec::with_capacity(s.len() / 2);
1308    let bytes = s.as_bytes();
1309    let mut i = 0;
1310    while i < bytes.len() {
1311        let hi = hex_nibble(bytes[i])?;
1312        let lo = hex_nibble(bytes[i + 1])?;
1313        out.push((hi << 4) | lo);
1314        i += 2;
1315    }
1316    Some(out)
1317}
1318
1319fn hex_nibble(b: u8) -> Option<u8> {
1320    match b {
1321        b'0'..=b'9' => Some(b - b'0'),
1322        b'a'..=b'f' => Some(b - b'a' + 10),
1323        b'A'..=b'F' => Some(b - b'A' + 10),
1324        _ => None,
1325    }
1326}
1327
1328pub fn json_error(code: &str, message: &str) -> String {
1329    serde_json::json!({"error": {"code": code, "message": message}}).to_string()
1330}
1331
1332/// Conventional field names that link a row to a user. Used by the GDPR
1333/// export/purge endpoints to discover referencing rows without requiring
1334/// app authors to annotate their schema. Apps that store user references
1335/// under non-conventional names must supply their own purge hook — the
1336/// default won't find them, and GDPR-compliant deletes require a full
1337/// sweep. We log a warning on export when entities appear to reference
1338/// users through custom names not in this list.
1339const USER_REF_FIELDS: &[&str] = &[
1340    "userId",
1341    "user_id",
1342    "authorId",
1343    "author_id",
1344    "ownerId",
1345    "owner_id",
1346    "createdBy",
1347    "created_by",
1348];
1349
1350/// Build a data-subject export for `user_id`. Returns a JSON envelope with
1351/// every row referencing the user across every manifest entity, plus the
1352/// User row itself when present. Format is stable — clients can diff
1353/// exports across time to see what's changed.
1354pub(crate) fn gdpr_export(ctx: &RouterContext, user_id: &str) -> (u16, String) {
1355    let manifest = ctx.store.manifest();
1356    let mut entities = serde_json::Map::new();
1357
1358    // The User row itself (if the schema has a User entity).
1359    if let Ok(Some(user_row)) = ctx.store.get_by_id("User", user_id) {
1360        entities.insert("User".to_string(), serde_json::json!([user_row]));
1361    }
1362
1363    // Every entity that has a user-ref field: list all rows matching.
1364    for ent in &manifest.entities {
1365        if ent.name == "User" {
1366            continue; // Already captured above.
1367        }
1368        let user_field = ent
1369            .fields
1370            .iter()
1371            .find(|f| USER_REF_FIELDS.contains(&f.name.as_str()));
1372        let Some(field) = user_field else { continue };
1373        let filter = serde_json::json!({ &field.name: user_id });
1374        match ctx.store.query_filtered(&ent.name, &filter) {
1375            Ok(rows) if !rows.is_empty() => {
1376                entities.insert(ent.name.clone(), serde_json::Value::Array(rows));
1377            }
1378            Ok(_) => {}
1379            Err(e) => {
1380                tracing::warn!("[gdpr] export: query {} failed: {}", ent.name, e.message);
1381            }
1382        }
1383    }
1384
1385    let envelope = serde_json::json!({
1386        "user_id": user_id,
1387        "exported_at": pylon_kernel::util::now_iso(),
1388        "entities": entities,
1389    });
1390    (200, envelope.to_string())
1391}
1392
1393/// Hard-delete every row tied to `user_id` and revoke all active sessions.
1394/// Best-effort: a per-entity error is logged and counted but does not abort
1395/// the sweep — partial success is better than leaving half the footprint.
1396pub(crate) fn gdpr_purge(ctx: &RouterContext, user_id: &str) -> (u16, String) {
1397    let manifest = ctx.store.manifest();
1398    let mut deleted: u64 = 0;
1399    let mut errors: Vec<String> = Vec::new();
1400
1401    // Delete the User row itself first.
1402    if let Ok(true) = ctx.store.delete("User", user_id) {
1403        deleted += 1;
1404        // Synthetic change event so sync clients notice.
1405        let seq = ctx
1406            .change_log
1407            .append("User", user_id, ChangeKind::Delete, None);
1408        broadcast_change(ctx.notifier, seq, "User", user_id, ChangeKind::Delete, None);
1409    }
1410
1411    // Referencing rows.
1412    for ent in &manifest.entities {
1413        if ent.name == "User" {
1414            continue;
1415        }
1416        let Some(field) = ent
1417            .fields
1418            .iter()
1419            .find(|f| USER_REF_FIELDS.contains(&f.name.as_str()))
1420        else {
1421            continue;
1422        };
1423        let filter = serde_json::json!({ &field.name: user_id });
1424        let rows = match ctx.store.query_filtered(&ent.name, &filter) {
1425            Ok(r) => r,
1426            Err(e) => {
1427                errors.push(format!("query {}: {}", ent.name, e.message));
1428                continue;
1429            }
1430        };
1431        for row in rows {
1432            let Some(id) = row.get("id").and_then(|v| v.as_str()) else {
1433                continue;
1434            };
1435            match ctx.store.delete(&ent.name, id) {
1436                Ok(true) => {
1437                    deleted += 1;
1438                    let seq = ctx
1439                        .change_log
1440                        .append(&ent.name, id, ChangeKind::Delete, None);
1441                    broadcast_change(ctx.notifier, seq, &ent.name, id, ChangeKind::Delete, None);
1442                }
1443                Ok(false) => {}
1444                Err(e) => errors.push(format!("delete {}/{}: {}", ent.name, id, e.message)),
1445            }
1446        }
1447    }
1448
1449    // Invalidate every active session for the user. Even after the row is
1450    // gone, an in-flight session token would let a purged user keep acting.
1451    let revoked = ctx.session_store.revoke_all_for_user(user_id);
1452
1453    let resp = serde_json::json!({
1454        "user_id": user_id,
1455        "rows_deleted": deleted,
1456        "sessions_revoked": revoked,
1457        "errors": errors,
1458        "purged_at": pylon_kernel::util::now_iso(),
1459    });
1460    (200, resp.to_string())
1461}
1462
1463/// Gate a route behind admin auth. Returns `Some(error_response)` if the
1464/// caller is NOT admin, `None` if they are. Use at the top of any control-
1465/// plane handler (jobs, workflows, sync push, etc) that shouldn't be open
1466/// to arbitrary clients.
1467pub(crate) fn require_admin(ctx: &RouterContext) -> Option<(u16, String)> {
1468    if ctx.auth_ctx.is_admin {
1469        None
1470    } else {
1471        Some((
1472            403,
1473            json_error(
1474                "FORBIDDEN",
1475                "this endpoint requires admin auth (PYLON_ADMIN_TOKEN)",
1476            ),
1477        ))
1478    }
1479}
1480
1481/// Gate a route behind "any authenticated identity". Returns `Some(err)` when
1482/// the caller has neither a user session nor an admin token. Used for the
1483/// rooms API, which previously let unauthenticated clients enumerate rooms
1484/// and read membership rosters — a silent presence-data leak.
1485pub(crate) fn require_auth(ctx: &RouterContext) -> Option<(u16, String)> {
1486    if ctx.auth_ctx.is_admin || ctx.auth_ctx.user_id.is_some() {
1487        None
1488    } else {
1489        Some((
1490            401,
1491            json_error("AUTH_REQUIRED", "authenticated session required"),
1492        ))
1493    }
1494}
1495
1496pub fn json_error_with_hint(code: &str, message: &str, hint: &str) -> String {
1497    serde_json::json!({"error": {"code": code, "message": message, "hint": hint}}).to_string()
1498}
1499
1500pub fn json_error_safe(code: &str, user_message: &str, internal: &str) -> String {
1501    tracing::warn!("[error] {code}: {internal}");
1502    json_error(code, user_message)
1503}
1504
1505/// Parse a JSON request body, returning a 400 error tuple on failure.
1506pub(crate) fn parse_json(body: &str) -> Result<serde_json::Value, (u16, String)> {
1507    serde_json::from_str(body).map_err(|e| {
1508        (
1509            400,
1510            json_error_safe(
1511                "INVALID_JSON",
1512                "Invalid request body",
1513                &format!("Invalid JSON: {e}"),
1514            ),
1515        )
1516    })
1517}
1518
1519/// Extract a query parameter value from a URL string.
1520pub(crate) fn query_param<'a>(url: &'a str, key: &str) -> Option<&'a str> {
1521    let search = format!("{key}=");
1522    url.split(&search).nth(1).and_then(|s| s.split('&').next())
1523}
1524
1525pub(crate) fn chrono_now_iso() -> String {
1526    pylon_kernel::util::now_iso()
1527}
1528
1529// ---------------------------------------------------------------------------
1530// Integration tests — auth-bypass regressions
1531// ---------------------------------------------------------------------------
1532//
1533// These tests lock in the auth-gate fixes from the security review so future
1534// changes can't silently re-introduce them. Each test builds a minimal
1535// RouterContext with stub implementations of the service traits and exercises
1536// a previously-vulnerable route.
1537
1538#[cfg(test)]
1539mod auth_gate_tests {
1540    use super::*;
1541    use pylon_auth::{AuthContext, CookieConfig, MagicCodeStore, OAuthStateStore, SessionStore};
1542    use pylon_kernel::{AppManifest, MANIFEST_VERSION};
1543    use pylon_policy::PolicyEngine;
1544    use pylon_sync::ChangeLog;
1545
1546    // -----------------------------------------------------------------------
1547    // Minimal stubs for every service trait so we can build a RouterContext
1548    // without wiring up a real runtime.
1549    // -----------------------------------------------------------------------
1550
1551    struct StubDataStore {
1552        manifest: AppManifest,
1553    }
1554    impl pylon_http::DataStore for StubDataStore {
1555        fn manifest(&self) -> &AppManifest {
1556            &self.manifest
1557        }
1558        fn insert(
1559            &self,
1560            _entity: &str,
1561            _data: &serde_json::Value,
1562        ) -> Result<String, pylon_http::DataError> {
1563            Ok("stub-id".to_string())
1564        }
1565        fn get_by_id(
1566            &self,
1567            _entity: &str,
1568            _id: &str,
1569        ) -> Result<Option<serde_json::Value>, pylon_http::DataError> {
1570            Ok(None)
1571        }
1572        fn list(&self, _entity: &str) -> Result<Vec<serde_json::Value>, pylon_http::DataError> {
1573            Ok(Vec::new())
1574        }
1575        fn list_after(
1576            &self,
1577            _entity: &str,
1578            _after: Option<&str>,
1579            _limit: usize,
1580        ) -> Result<Vec<serde_json::Value>, pylon_http::DataError> {
1581            Ok(Vec::new())
1582        }
1583        fn update(
1584            &self,
1585            _entity: &str,
1586            _id: &str,
1587            _data: &serde_json::Value,
1588        ) -> Result<bool, pylon_http::DataError> {
1589            Ok(true)
1590        }
1591        fn delete(&self, _entity: &str, _id: &str) -> Result<bool, pylon_http::DataError> {
1592            Ok(true)
1593        }
1594        fn lookup(
1595            &self,
1596            _entity: &str,
1597            _field: &str,
1598            _value: &str,
1599        ) -> Result<Option<serde_json::Value>, pylon_http::DataError> {
1600            Ok(None)
1601        }
1602        fn link(
1603            &self,
1604            _entity: &str,
1605            _id: &str,
1606            _relation: &str,
1607            _target_id: &str,
1608        ) -> Result<bool, pylon_http::DataError> {
1609            Ok(true)
1610        }
1611        fn unlink(
1612            &self,
1613            _entity: &str,
1614            _id: &str,
1615            _relation: &str,
1616        ) -> Result<bool, pylon_http::DataError> {
1617            Ok(true)
1618        }
1619        fn query_filtered(
1620            &self,
1621            _entity: &str,
1622            _filter: &serde_json::Value,
1623        ) -> Result<Vec<serde_json::Value>, pylon_http::DataError> {
1624            Ok(Vec::new())
1625        }
1626        fn query_graph(
1627            &self,
1628            _query: &serde_json::Value,
1629        ) -> Result<serde_json::Value, pylon_http::DataError> {
1630            Ok(serde_json::json!({}))
1631        }
1632        fn transact(
1633            &self,
1634            _ops: &[serde_json::Value],
1635        ) -> Result<(bool, Vec<serde_json::Value>), pylon_http::DataError> {
1636            Ok((true, Vec::new()))
1637        }
1638    }
1639
1640    macro_rules! stub_ops {
1641        ($name:ident, $trait:path) => {
1642            struct $name;
1643        };
1644    }
1645
1646    stub_ops!(StubRooms, RoomOps);
1647    stub_ops!(StubCache, CacheOps);
1648    stub_ops!(StubPubSub, PubSubOps);
1649    stub_ops!(StubJobs, JobOps);
1650    stub_ops!(StubScheduler, SchedulerOps);
1651    stub_ops!(StubWorkflows, WorkflowOps);
1652    stub_ops!(StubFiles, FileOps);
1653    stub_ops!(StubOpenApi, OpenApiGenerator);
1654    stub_ops!(StubEmail, EmailSender);
1655
1656    impl RoomOps for StubRooms {
1657        fn join(
1658            &self,
1659            _room: &str,
1660            _user_id: &str,
1661            _data: Option<serde_json::Value>,
1662        ) -> Result<(serde_json::Value, serde_json::Value), pylon_http::DataError> {
1663            Ok((serde_json::json!({}), serde_json::json!({})))
1664        }
1665        fn leave(&self, _room: &str, _user_id: &str) -> Option<serde_json::Value> {
1666            None
1667        }
1668        fn set_presence(
1669            &self,
1670            _room: &str,
1671            _user_id: &str,
1672            _data: serde_json::Value,
1673        ) -> Option<serde_json::Value> {
1674            None
1675        }
1676        fn broadcast(
1677            &self,
1678            _room: &str,
1679            _sender: Option<&str>,
1680            _topic: &str,
1681            _data: serde_json::Value,
1682        ) -> Option<serde_json::Value> {
1683            None
1684        }
1685        fn list_rooms(&self) -> Vec<String> {
1686            vec![]
1687        }
1688        fn room_size(&self, _name: &str) -> usize {
1689            0
1690        }
1691        fn members(&self, _name: &str) -> Vec<serde_json::Value> {
1692            vec![]
1693        }
1694    }
1695    impl CacheOps for StubCache {
1696        fn handle_command(&self, _body: &str) -> (u16, String) {
1697            (200, "{}".into())
1698        }
1699        fn handle_get(&self, _key: &str) -> (u16, String) {
1700            (404, "{}".into())
1701        }
1702        fn handle_delete(&self, _key: &str) -> (u16, String) {
1703            (200, "{}".into())
1704        }
1705    }
1706    impl PubSubOps for StubPubSub {
1707        fn handle_publish(&self, _body: &str) -> (u16, String) {
1708            (200, "{}".into())
1709        }
1710        fn handle_channels(&self) -> (u16, String) {
1711            (200, "[]".into())
1712        }
1713        fn handle_history(&self, _channel: &str, _url: &str) -> (u16, String) {
1714            (200, "[]".into())
1715        }
1716    }
1717    impl JobOps for StubJobs {
1718        fn enqueue(
1719            &self,
1720            _name: &str,
1721            _payload: serde_json::Value,
1722            _priority: &str,
1723            _delay_secs: u64,
1724            _max_retries: u32,
1725            _queue: &str,
1726        ) -> String {
1727            "job-id".into()
1728        }
1729        fn stats(&self) -> serde_json::Value {
1730            serde_json::json!({})
1731        }
1732        fn dead_letters(&self) -> serde_json::Value {
1733            serde_json::json!([])
1734        }
1735        fn retry_dead(&self, _id: &str) -> bool {
1736            false
1737        }
1738        fn list_jobs(
1739            &self,
1740            _status: Option<&str>,
1741            _queue: Option<&str>,
1742            _limit: usize,
1743        ) -> serde_json::Value {
1744            serde_json::json!([])
1745        }
1746        fn get_job(&self, _id: &str) -> Option<serde_json::Value> {
1747            None
1748        }
1749    }
1750    impl SchedulerOps for StubScheduler {
1751        fn list_tasks(&self) -> serde_json::Value {
1752            serde_json::json!([])
1753        }
1754        fn trigger(&self, _name: &str) -> bool {
1755            false
1756        }
1757    }
1758    impl WorkflowOps for StubWorkflows {
1759        fn definitions(&self) -> serde_json::Value {
1760            serde_json::json!([])
1761        }
1762        fn start(&self, _name: &str, _input: serde_json::Value) -> Result<String, String> {
1763            Ok("wf-id".into())
1764        }
1765        fn list(&self, _status: Option<&str>) -> serde_json::Value {
1766            serde_json::json!([])
1767        }
1768        fn get(&self, _id: &str) -> Option<serde_json::Value> {
1769            None
1770        }
1771        fn advance(&self, _id: &str) -> Result<String, String> {
1772            Ok("running".into())
1773        }
1774        fn send_event(
1775            &self,
1776            _id: &str,
1777            _event: &str,
1778            _data: serde_json::Value,
1779        ) -> Result<(), String> {
1780            Ok(())
1781        }
1782        fn cancel(&self, _id: &str) -> Result<(), String> {
1783            Ok(())
1784        }
1785    }
1786    impl FileOps for StubFiles {
1787        fn upload(&self, _body: &str) -> (u16, String) {
1788            (501, "{}".into())
1789        }
1790        fn get_file(&self, _id: &str) -> (u16, String) {
1791            (404, "{}".into())
1792        }
1793    }
1794    impl OpenApiGenerator for StubOpenApi {
1795        fn generate(&self, _base: &str) -> String {
1796            "{}".into()
1797        }
1798    }
1799    impl EmailSender for StubEmail {
1800        fn send(&self, _to: &str, _subject: &str, _body: &str) -> Result<(), String> {
1801            Ok(())
1802        }
1803    }
1804
1805    fn empty_manifest() -> AppManifest {
1806        AppManifest {
1807            manifest_version: MANIFEST_VERSION,
1808            name: "test".into(),
1809            version: "0.1.0".into(),
1810            entities: vec![],
1811            routes: vec![],
1812            queries: vec![],
1813            actions: vec![],
1814            policies: vec![],
1815            auth: Default::default(),
1816        }
1817    }
1818
1819    /// Scaffold a RouterContext for tests. Caller chooses is_dev + auth.
1820    fn with_ctx<F>(is_dev: bool, auth: &AuthContext, f: F)
1821    where
1822        F: FnOnce(&RouterContext),
1823    {
1824        with_ctx_hooks(is_dev, auth, &NoopPluginHooks, f);
1825    }
1826
1827    fn with_ctx_hooks<F>(is_dev: bool, auth: &AuthContext, hooks: &dyn PluginHookOps, f: F)
1828    where
1829        F: FnOnce(&RouterContext),
1830    {
1831        let manifest = empty_manifest();
1832        let store = StubDataStore {
1833            manifest: manifest.clone(),
1834        };
1835        let session_store = SessionStore::new();
1836        let magic_codes = MagicCodeStore::new();
1837        let oauth_state = OAuthStateStore::new();
1838        let account_store = pylon_auth::AccountStore::new();
1839        let api_keys = pylon_auth::api_key::ApiKeyStore::new();
1840        let orgs = pylon_auth::org::OrgStore::new();
1841        let siwe = pylon_auth::siwe::NonceStore::new();
1842        let phone_codes = pylon_auth::phone::PhoneCodeStore::new();
1843        let passkeys = pylon_auth::webauthn::PasskeyStore::new();
1844        let verification = pylon_auth::verification::VerificationStore::new();
1845        let audit = pylon_auth::audit::AuditStore::new();
1846        let policy_engine = PolicyEngine::from_manifest(&manifest);
1847        let change_log = ChangeLog::new();
1848        let notifier = NoopNotifier;
1849        let rooms = StubRooms;
1850        let cache = StubCache;
1851        let pubsub = StubPubSub;
1852        let jobs = StubJobs;
1853        let scheduler = StubScheduler;
1854        let workflows = StubWorkflows;
1855        let files = StubFiles;
1856        let openapi = StubOpenApi;
1857        let email = StubEmail;
1858        let cookie_config = CookieConfig::from_env(&CookieConfig::default_name_for("test"));
1859
1860        let ctx = RouterContext {
1861            store: &store,
1862            session_store: &session_store,
1863            magic_codes: &magic_codes,
1864            oauth_state: &oauth_state,
1865            account_store: &account_store,
1866            api_keys: &api_keys,
1867            orgs: &orgs,
1868            siwe: &siwe,
1869            phone_codes: &phone_codes,
1870            passkeys: &passkeys,
1871            verification: &verification,
1872            audit: &audit,
1873            trusted_origins: &[],
1874            policy_engine: &policy_engine,
1875            change_log: &change_log,
1876            notifier: &notifier,
1877            rooms: &rooms,
1878            cache: &cache,
1879            pubsub: &pubsub,
1880            jobs: &jobs,
1881            scheduler: &scheduler,
1882            workflows: &workflows,
1883            files: &files,
1884            openapi: &openapi,
1885            functions: None,
1886            email: &email,
1887            shards: None,
1888            plugin_hooks: hooks,
1889            auth_ctx: auth,
1890            is_dev,
1891            request_headers: &[],
1892            peer_ip: "127.0.0.1",
1893            cookie_config: &cookie_config,
1894            response_headers: RefCell::new(Vec::new()),
1895        };
1896        f(&ctx);
1897    }
1898
1899    // -----------------------------------------------------------------------
1900    // Regression tests for the previously-vulnerable routes
1901    // -----------------------------------------------------------------------
1902
1903    /// Prior vuln: POST /api/auth/session accepted any user_id from anonymous
1904    /// callers and minted a valid session. Now: prod requires admin.
1905    #[test]
1906    fn auth_session_refuses_non_admin_in_prod() {
1907        let anon = AuthContext::anonymous();
1908        with_ctx(false, &anon, |ctx| {
1909            let (status, body, _ct) = route(
1910                ctx,
1911                HttpMethod::Post,
1912                "/api/auth/session",
1913                r#"{"user_id":"victim"}"#,
1914                None,
1915            );
1916            assert_eq!(status, 403);
1917            assert!(body.contains("FORBIDDEN"));
1918        });
1919    }
1920
1921    #[test]
1922    fn auth_session_allowed_for_admin_in_prod() {
1923        let admin = AuthContext::admin();
1924        with_ctx(false, &admin, |ctx| {
1925            let (status, _body, _ct) = route(
1926                ctx,
1927                HttpMethod::Post,
1928                "/api/auth/session",
1929                r#"{"user_id":"alice"}"#,
1930                None,
1931            );
1932            assert_eq!(status, 201);
1933        });
1934    }
1935
1936    /// Prior vuln: OAuth callback accepted `{email, state}` without a real
1937    /// authorization code, letting anyone mint a session for any email.
1938    /// Now: prod requires an authorization code.
1939    #[test]
1940    fn oauth_callback_refuses_missing_code_in_prod() {
1941        let anon = AuthContext::anonymous();
1942        with_ctx(false, &anon, |ctx| {
1943            // Mint a state token so the state check passes — the `code`
1944            // requirement is what must still stop us.
1945            let state = ctx
1946                .oauth_state
1947                .create("google", "https://app/cb", "https://app/cb");
1948            let body = format!(r#"{{"state":"{state}","email":"victim@example.com"}}"#);
1949            let (status, resp, _ct) = route(
1950                ctx,
1951                HttpMethod::Post,
1952                "/api/auth/callback/google",
1953                &body,
1954                None,
1955            );
1956            assert_eq!(status, 400);
1957            assert!(resp.contains("authorization") || resp.contains("code"));
1958        });
1959    }
1960
1961    /// Prior vuln: /api/sync/push had no auth check.
1962    #[test]
1963    fn sync_push_requires_admin() {
1964        let anon = AuthContext::anonymous();
1965        with_ctx(false, &anon, |ctx| {
1966            let (status, body, _ct) = route(
1967                ctx,
1968                HttpMethod::Post,
1969                "/api/sync/push",
1970                r#"{"changes":[]}"#,
1971                None,
1972            );
1973            assert_eq!(status, 403);
1974            assert!(body.contains("FORBIDDEN"));
1975        });
1976    }
1977
1978    /// Prior vuln: /api/transact had no auth check.
1979    #[test]
1980    fn transact_requires_admin() {
1981        let anon = AuthContext::anonymous();
1982        with_ctx(false, &anon, |ctx| {
1983            let (status, _body, _ct) = route(
1984                ctx,
1985                HttpMethod::Post,
1986                "/api/transact",
1987                r#"{"ops":[]}"#,
1988                None,
1989            );
1990            assert_eq!(status, 403);
1991        });
1992    }
1993
1994    /// Prior vuln: /api/workflows/start had no auth check.
1995    #[test]
1996    fn workflow_start_requires_admin() {
1997        let anon = AuthContext::anonymous();
1998        with_ctx(false, &anon, |ctx| {
1999            let (status, _body, _ct) = route(
2000                ctx,
2001                HttpMethod::Post,
2002                "/api/workflows/start",
2003                r#"{"name":"x"}"#,
2004                None,
2005            );
2006            assert_eq!(status, 403);
2007        });
2008    }
2009
2010    /// Prior vuln: /api/jobs enqueue was open.
2011    #[test]
2012    fn jobs_enqueue_requires_admin() {
2013        let anon = AuthContext::anonymous();
2014        with_ctx(false, &anon, |ctx| {
2015            let (status, _body, _ct) = route(
2016                ctx,
2017                HttpMethod::Post,
2018                "/api/jobs",
2019                r#"{"name":"x","payload":{}}"#,
2020                None,
2021            );
2022            assert_eq!(status, 403);
2023        });
2024    }
2025
2026    // -----------------------------------------------------------------------
2027    // Robustness / fuzz-style property tests
2028    //
2029    // The router is the one public entry point that takes arbitrary bytes
2030    // from the network. Any input must yield a response, not a panic.
2031    // These tests hammer the handlers with malformed bodies, weird paths,
2032    // and deeply-nested JSON. If any of them panic or loop, we catch it
2033    // here instead of in production.
2034    // -----------------------------------------------------------------------
2035
2036    fn assert_route_doesnt_panic(ctx: &RouterContext, method: HttpMethod, url: &str, body: &str) {
2037        // `route` is synchronous. A panic would abort the test thread; the
2038        // test harness would fail the whole test. So just call it — success
2039        // means no panic.
2040        let (_status, _body, _ct) = route(ctx, method, url, body, None);
2041    }
2042
2043    #[test]
2044    fn fuzz_malformed_json_bodies_never_panic() {
2045        let admin = AuthContext::admin();
2046        with_ctx(true, &admin, |ctx| {
2047            let samples = [
2048                "",
2049                "not json",
2050                "{",
2051                "}",
2052                "{\"",
2053                "{\"key\":",
2054                "[]",
2055                "null",
2056                "true",
2057                "\"string\"",
2058                "{\"changes\":\"not an array\"}",
2059                &format!("{{\"deeply\":{}}}", "{".repeat(1000)),
2060                "{\"unicode\":\"\\u0000\"}",
2061                "{\"numbers\":1e308}",
2062                "{\"negative\":-999999999999999}",
2063            ];
2064            for body in &samples {
2065                for url in &[
2066                    "/api/sync/push",
2067                    "/api/transact",
2068                    "/api/import",
2069                    "/api/batch",
2070                    "/api/jobs",
2071                    "/api/auth/session",
2072                    "/api/auth/magic/send",
2073                ] {
2074                    assert_route_doesnt_panic(ctx, HttpMethod::Post, url, body);
2075                }
2076            }
2077        });
2078    }
2079
2080    #[test]
2081    fn fuzz_weird_urls_never_panic() {
2082        let admin = AuthContext::admin();
2083        with_ctx(true, &admin, |ctx| {
2084            let samples = [
2085                "/",
2086                "/api",
2087                "/api/",
2088                "/api/entities/",
2089                "/api/entities//",
2090                "/api/entities/%00",
2091                "/api/entities/../escape",
2092                "/api/entities/User?garbage=\x01",
2093                "/api/entities/User?$limit=abc&$order=garbage",
2094                &format!("/api/entities/{}", "a".repeat(10_000)),
2095                "/api/fn/",
2096                "/api/fn/traces",
2097                "/api/shards/id/connect",
2098                "/api/workflows/definitions",
2099                "/api/workflows/nonexistent/advance",
2100                "/api/rooms/",
2101                "/api/rooms/%20",
2102            ];
2103            for url in &samples {
2104                assert_route_doesnt_panic(ctx, HttpMethod::Get, url, "");
2105                assert_route_doesnt_panic(ctx, HttpMethod::Post, url, "{}");
2106                assert_route_doesnt_panic(ctx, HttpMethod::Delete, url, "");
2107            }
2108        });
2109    }
2110
2111    #[test]
2112    fn fuzz_deeply_nested_json_dont_stack_overflow() {
2113        // serde_json has an internal recursion limit (default 128); confirm
2114        // depths beyond that return 400 rather than overflow the stack.
2115        let admin = AuthContext::admin();
2116        with_ctx(true, &admin, |ctx| {
2117            let depth = 300;
2118            let body = format!("{}{}", "[".repeat(depth), "]".repeat(depth),);
2119            let (status, _body, _ct) = route(ctx, HttpMethod::Post, "/api/sync/push", &body, None);
2120            // Serde may reject with 400, or the handler may accept and
2121            // treat as empty — either is fine. The key property: no panic.
2122            assert!(status >= 200 && status < 600);
2123        });
2124    }
2125
2126    #[test]
2127    fn fuzz_unusual_http_methods_gracefully() {
2128        let admin = AuthContext::admin();
2129        with_ctx(true, &admin, |ctx| {
2130            for method in [
2131                HttpMethod::Get,
2132                HttpMethod::Post,
2133                HttpMethod::Put,
2134                HttpMethod::Patch,
2135                HttpMethod::Delete,
2136                HttpMethod::Options,
2137                HttpMethod::Head,
2138            ] {
2139                let (_status, _body, _ct) = route(ctx, method, "/api/entities/User", "{}", None);
2140            }
2141        });
2142    }
2143
2144    // -----------------------------------------------------------------------
2145    // /api/auth/email/* — verify the auth gate, the rate limiter, and a
2146    // happy-path send→verify cycle. Uses a User-aware stub store because
2147    // the default StubDataStore returns None for every get_by_id.
2148    // -----------------------------------------------------------------------
2149
2150    /// Stub store that pretends User "u-1" exists with email
2151    /// "alice@example.com" and tracks update calls so we can assert the
2152    /// emailVerified field gets set.
2153    struct UserStubStore {
2154        manifest: AppManifest,
2155        last_update: std::sync::Mutex<Option<(String, String, serde_json::Value)>>,
2156    }
2157    impl pylon_http::DataStore for UserStubStore {
2158        fn manifest(&self) -> &AppManifest {
2159            &self.manifest
2160        }
2161        fn insert(
2162            &self,
2163            _e: &str,
2164            _d: &serde_json::Value,
2165        ) -> Result<String, pylon_http::DataError> {
2166            Ok("u-1".into())
2167        }
2168        fn get_by_id(
2169            &self,
2170            entity: &str,
2171            id: &str,
2172        ) -> Result<Option<serde_json::Value>, pylon_http::DataError> {
2173            if entity == "User" && id == "u-1" {
2174                return Ok(Some(serde_json::json!({
2175                    "id": "u-1",
2176                    "email": "alice@example.com",
2177                    "displayName": "Alice",
2178                })));
2179            }
2180            Ok(None)
2181        }
2182        fn list(&self, _e: &str) -> Result<Vec<serde_json::Value>, pylon_http::DataError> {
2183            Ok(vec![])
2184        }
2185        fn list_after(
2186            &self,
2187            _e: &str,
2188            _a: Option<&str>,
2189            _l: usize,
2190        ) -> Result<Vec<serde_json::Value>, pylon_http::DataError> {
2191            Ok(vec![])
2192        }
2193        fn update(
2194            &self,
2195            entity: &str,
2196            id: &str,
2197            data: &serde_json::Value,
2198        ) -> Result<bool, pylon_http::DataError> {
2199            *self.last_update.lock().unwrap() = Some((entity.into(), id.into(), data.clone()));
2200            Ok(true)
2201        }
2202        fn delete(&self, _e: &str, _i: &str) -> Result<bool, pylon_http::DataError> {
2203            Ok(true)
2204        }
2205        fn lookup(
2206            &self,
2207            _e: &str,
2208            _f: &str,
2209            _v: &str,
2210        ) -> Result<Option<serde_json::Value>, pylon_http::DataError> {
2211            Ok(None)
2212        }
2213        fn link(
2214            &self,
2215            _e: &str,
2216            _i: &str,
2217            _r: &str,
2218            _t: &str,
2219        ) -> Result<bool, pylon_http::DataError> {
2220            Ok(true)
2221        }
2222        fn unlink(&self, _e: &str, _i: &str, _r: &str) -> Result<bool, pylon_http::DataError> {
2223            Ok(true)
2224        }
2225        fn query_filtered(
2226            &self,
2227            _e: &str,
2228            _f: &serde_json::Value,
2229        ) -> Result<Vec<serde_json::Value>, pylon_http::DataError> {
2230            Ok(vec![])
2231        }
2232        fn query_graph(
2233            &self,
2234            _q: &serde_json::Value,
2235        ) -> Result<serde_json::Value, pylon_http::DataError> {
2236            Ok(serde_json::json!({}))
2237        }
2238        fn aggregate(
2239            &self,
2240            _e: &str,
2241            _s: &serde_json::Value,
2242        ) -> Result<serde_json::Value, pylon_http::DataError> {
2243            Ok(serde_json::json!({}))
2244        }
2245        fn transact(
2246            &self,
2247            _o: &[serde_json::Value],
2248        ) -> Result<(bool, Vec<serde_json::Value>), pylon_http::DataError> {
2249            Ok((true, vec![]))
2250        }
2251        fn search(
2252            &self,
2253            _e: &str,
2254            _q: &serde_json::Value,
2255        ) -> Result<serde_json::Value, pylon_http::DataError> {
2256            Ok(serde_json::json!({}))
2257        }
2258    }
2259
2260    /// Capture-the-email stub so we can assert the body the user would
2261    /// have received. Production wiring does this through an Resend /
2262    /// SES adapter; tests just want to read what got "sent".
2263    struct CaptureEmail {
2264        sent: std::sync::Mutex<Vec<(String, String, String)>>,
2265    }
2266    impl EmailSender for CaptureEmail {
2267        fn send(&self, to: &str, subject: &str, body: &str) -> Result<(), String> {
2268            self.sent
2269                .lock()
2270                .unwrap()
2271                .push((to.into(), subject.into(), body.into()));
2272            Ok(())
2273        }
2274    }
2275
2276    fn with_user_ctx<F>(is_dev: bool, auth: &AuthContext, f: F)
2277    where
2278        F: FnOnce(&RouterContext, &UserStubStore, &CaptureEmail, &MagicCodeStore),
2279    {
2280        let manifest = empty_manifest();
2281        let store = UserStubStore {
2282            manifest: manifest.clone(),
2283            last_update: std::sync::Mutex::new(None),
2284        };
2285        let session_store = SessionStore::new();
2286        let magic_codes = MagicCodeStore::new();
2287        let oauth_state = OAuthStateStore::new();
2288        let account_store = pylon_auth::AccountStore::new();
2289        let api_keys = pylon_auth::api_key::ApiKeyStore::new();
2290        let orgs = pylon_auth::org::OrgStore::new();
2291        let siwe = pylon_auth::siwe::NonceStore::new();
2292        let phone_codes = pylon_auth::phone::PhoneCodeStore::new();
2293        let passkeys = pylon_auth::webauthn::PasskeyStore::new();
2294        let verification = pylon_auth::verification::VerificationStore::new();
2295        let audit = pylon_auth::audit::AuditStore::new();
2296        let policy_engine = PolicyEngine::from_manifest(&manifest);
2297        let change_log = ChangeLog::new();
2298        let notifier = NoopNotifier;
2299        let rooms = StubRooms;
2300        let cache = StubCache;
2301        let pubsub = StubPubSub;
2302        let jobs = StubJobs;
2303        let scheduler = StubScheduler;
2304        let workflows = StubWorkflows;
2305        let files = StubFiles;
2306        let openapi = StubOpenApi;
2307        let email = CaptureEmail {
2308            sent: std::sync::Mutex::new(vec![]),
2309        };
2310        let hooks = NoopPluginHooks;
2311        let cookie_config = CookieConfig::from_env(&CookieConfig::default_name_for("test"));
2312
2313        let ctx = RouterContext {
2314            store: &store,
2315            session_store: &session_store,
2316            magic_codes: &magic_codes,
2317            oauth_state: &oauth_state,
2318            account_store: &account_store,
2319            api_keys: &api_keys,
2320            orgs: &orgs,
2321            siwe: &siwe,
2322            phone_codes: &phone_codes,
2323            passkeys: &passkeys,
2324            verification: &verification,
2325            audit: &audit,
2326            trusted_origins: &[],
2327            policy_engine: &policy_engine,
2328            change_log: &change_log,
2329            notifier: &notifier,
2330            rooms: &rooms,
2331            cache: &cache,
2332            pubsub: &pubsub,
2333            jobs: &jobs,
2334            scheduler: &scheduler,
2335            workflows: &workflows,
2336            files: &files,
2337            openapi: &openapi,
2338            functions: None,
2339            email: &email,
2340            shards: None,
2341            plugin_hooks: &hooks,
2342            auth_ctx: auth,
2343            is_dev,
2344            request_headers: &[],
2345            peer_ip: "127.0.0.1",
2346            cookie_config: &cookie_config,
2347            response_headers: RefCell::new(Vec::new()),
2348        };
2349        f(&ctx, &store, &email, &magic_codes);
2350    }
2351
2352    #[test]
2353    fn email_send_verification_requires_auth() {
2354        let anon = AuthContext::anonymous();
2355        with_user_ctx(true, &anon, |ctx, _, _, _| {
2356            let (status, body, _) = route(
2357                ctx,
2358                HttpMethod::Post,
2359                "/api/auth/email/send-verification",
2360                "{}",
2361                None,
2362            );
2363            assert_eq!(status, 401);
2364            assert!(body.contains("UNAUTHORIZED"));
2365        });
2366    }
2367
2368    #[test]
2369    fn email_verify_requires_auth() {
2370        let anon = AuthContext::anonymous();
2371        with_user_ctx(true, &anon, |ctx, _, _, _| {
2372            let (status, body, _) = route(
2373                ctx,
2374                HttpMethod::Post,
2375                "/api/auth/email/verify",
2376                r#"{"code":"123456"}"#,
2377                None,
2378            );
2379            assert_eq!(status, 401);
2380            assert!(body.contains("UNAUTHORIZED"));
2381        });
2382    }
2383
2384    #[test]
2385    fn email_send_verification_uses_session_email_not_body() {
2386        // Caller is "u-1" (alice@example.com). Even if they put a
2387        // different email in the body, the code should be issued for
2388        // the SESSION's email — otherwise an authed caller could spam
2389        // codes to arbitrary addresses.
2390        let alice = AuthContext::authenticated("u-1".into());
2391        with_user_ctx(true, &alice, |ctx, _, email, _| {
2392            let (status, body, _) = route(
2393                ctx,
2394                HttpMethod::Post,
2395                "/api/auth/email/send-verification",
2396                r#"{"email":"victim@example.com"}"#,
2397                None,
2398            );
2399            assert_eq!(status, 200);
2400            // Dev mode echoes the code; verify the recipient is alice,
2401            // not the body's victim.
2402            let sent = email.sent.lock().unwrap();
2403            assert_eq!(sent.len(), 1);
2404            assert_eq!(sent[0].0, "alice@example.com");
2405            assert!(body.contains("alice@example.com"));
2406            assert!(!body.contains("victim@example.com"));
2407        });
2408    }
2409
2410    #[test]
2411    fn email_verify_happy_path_stamps_email_verified() {
2412        let alice = AuthContext::authenticated("u-1".into());
2413        with_user_ctx(true, &alice, |ctx, store, _, magic_codes| {
2414            // Pre-issue a code (skipping the send endpoint) so we test
2415            // verify in isolation.
2416            let code = magic_codes.try_create("alice@example.com").unwrap();
2417            let body = format!(r#"{{"code":"{code}"}}"#);
2418            let (status, resp, _) =
2419                route(ctx, HttpMethod::Post, "/api/auth/email/verify", &body, None);
2420            assert_eq!(status, 200);
2421            assert!(resp.contains("\"verified\":true"));
2422            // Update was attempted on User u-1 with emailVerified set.
2423            let last = store.last_update.lock().unwrap();
2424            let (entity, id, data) = last.as_ref().expect("update should have fired");
2425            assert_eq!(entity, "User");
2426            assert_eq!(id, "u-1");
2427            assert!(data.get("emailVerified").is_some());
2428        });
2429    }
2430
2431    #[test]
2432    fn email_verify_rejects_wrong_code() {
2433        let alice = AuthContext::authenticated("u-1".into());
2434        with_user_ctx(true, &alice, |ctx, store, _, magic_codes| {
2435            let _ = magic_codes.try_create("alice@example.com").unwrap();
2436            let (status, body, _) = route(
2437                ctx,
2438                HttpMethod::Post,
2439                "/api/auth/email/verify",
2440                r#"{"code":"999999"}"#,
2441                None,
2442            );
2443            assert_eq!(status, 401);
2444            assert!(body.contains("INVALID_CODE"));
2445            // No update should have happened.
2446            assert!(store.last_update.lock().unwrap().is_none());
2447        });
2448    }
2449
2450    /// Dev mode keeps the old permissive behaviour for local tooling.
2451    #[test]
2452    fn auth_session_allowed_in_dev_mode() {
2453        let anon = AuthContext::anonymous();
2454        with_ctx(true, &anon, |ctx| {
2455            let (status, _body, _ct) = route(
2456                ctx,
2457                HttpMethod::Post,
2458                "/api/auth/session",
2459                r#"{"user_id":"alice"}"#,
2460                None,
2461            );
2462            assert_eq!(status, 201);
2463        });
2464    }
2465
2466    // -----------------------------------------------------------------------
2467    // Plugin CRUD hook wiring — prior vuln: POST/PATCH/DELETE on
2468    // /api/entities/* bypassed the registered plugin before_/after_ hooks,
2469    // so validation/audit_log/webhooks/slugify/timestamps never saw data-
2470    // plane writes. These tests pin that the router now runs them.
2471    // -----------------------------------------------------------------------
2472
2473    use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
2474
2475    struct CountingHooks {
2476        before_insert_calls: AtomicU32,
2477        after_insert_calls: AtomicU32,
2478        before_delete_calls: AtomicU32,
2479        reject_on_entity: Option<&'static str>,
2480    }
2481
2482    impl CountingHooks {
2483        fn new() -> Self {
2484            Self {
2485                before_insert_calls: AtomicU32::new(0),
2486                after_insert_calls: AtomicU32::new(0),
2487                before_delete_calls: AtomicU32::new(0),
2488                reject_on_entity: None,
2489            }
2490        }
2491    }
2492
2493    impl PluginHookOps for CountingHooks {
2494        fn before_insert(
2495            &self,
2496            entity: &str,
2497            _data: &mut serde_json::Value,
2498            _auth: &AuthContext,
2499        ) -> Result<(), (u16, String, String)> {
2500            self.before_insert_calls.fetch_add(1, Ordering::SeqCst);
2501            if self.reject_on_entity == Some(entity) {
2502                return Err((422, "VALIDATION".into(), "rejected by plugin".into()));
2503            }
2504            Ok(())
2505        }
2506        fn after_insert(
2507            &self,
2508            _entity: &str,
2509            _id: &str,
2510            _data: &serde_json::Value,
2511            _auth: &AuthContext,
2512        ) {
2513            self.after_insert_calls.fetch_add(1, Ordering::SeqCst);
2514        }
2515        fn before_update(
2516            &self,
2517            _entity: &str,
2518            _id: &str,
2519            _data: &mut serde_json::Value,
2520            _auth: &AuthContext,
2521        ) -> Result<(), (u16, String, String)> {
2522            Ok(())
2523        }
2524        fn after_update(
2525            &self,
2526            _entity: &str,
2527            _id: &str,
2528            _data: &serde_json::Value,
2529            _auth: &AuthContext,
2530        ) {
2531        }
2532        fn before_delete(
2533            &self,
2534            _entity: &str,
2535            _id: &str,
2536            _auth: &AuthContext,
2537        ) -> Result<(), (u16, String, String)> {
2538            self.before_delete_calls.fetch_add(1, Ordering::SeqCst);
2539            Ok(())
2540        }
2541        fn after_delete(&self, _entity: &str, _id: &str, _auth: &AuthContext) {}
2542    }
2543
2544    #[test]
2545    fn plugin_hooks_fire_on_entity_post() {
2546        // StubDataStore::insert always succeeds with id="stub-id", so we
2547        // just assert the before_/after_ counters tick.
2548        let admin = AuthContext::admin();
2549        let hooks = CountingHooks::new();
2550        with_ctx_hooks(true, &admin, &hooks, |ctx| {
2551            let (status, _body, _ct) = route(
2552                ctx,
2553                HttpMethod::Post,
2554                "/api/entities/User",
2555                r#"{"email":"a@b"}"#,
2556                None,
2557            );
2558            assert_eq!(status, 201);
2559        });
2560        assert_eq!(hooks.before_insert_calls.load(Ordering::SeqCst), 1);
2561        assert_eq!(hooks.after_insert_calls.load(Ordering::SeqCst), 1);
2562    }
2563
2564    #[test]
2565    fn plugin_before_insert_rejection_short_circuits_write() {
2566        // If the plugin rejects, the store's insert must NOT be called and
2567        // the status must propagate.
2568        let admin = AuthContext::admin();
2569        let rejector = CountingHooks {
2570            reject_on_entity: Some("User"),
2571            ..CountingHooks::new()
2572        };
2573        with_ctx_hooks(true, &admin, &rejector, |ctx| {
2574            let (status, body, _ct) =
2575                route(ctx, HttpMethod::Post, "/api/entities/User", r#"{}"#, None);
2576            assert_eq!(status, 422);
2577            assert!(body.contains("VALIDATION"));
2578        });
2579        assert_eq!(rejector.before_insert_calls.load(Ordering::SeqCst), 1);
2580        // after_insert must NOT have been called when before_insert rejected.
2581        assert_eq!(rejector.after_insert_calls.load(Ordering::SeqCst), 0);
2582    }
2583
2584    // -----------------------------------------------------------------------
2585    // GDPR export + purge — pinned behavior so accidental breakage shows up
2586    // as a test failure instead of a compliance surprise.
2587    // -----------------------------------------------------------------------
2588
2589    #[test]
2590    fn gdpr_export_requires_admin() {
2591        let anon = AuthContext::anonymous();
2592        with_ctx(false, &anon, |ctx| {
2593            let (status, body, _ct) = route(
2594                ctx,
2595                HttpMethod::Post,
2596                "/api/admin/users/alice/export",
2597                "",
2598                None,
2599            );
2600            assert_eq!(status, 403);
2601            assert!(body.contains("FORBIDDEN"));
2602        });
2603    }
2604
2605    #[test]
2606    fn gdpr_purge_requires_admin() {
2607        let anon = AuthContext::anonymous();
2608        with_ctx(false, &anon, |ctx| {
2609            let (status, _body, _ct) = route(
2610                ctx,
2611                HttpMethod::Delete,
2612                "/api/admin/users/alice/purge",
2613                "",
2614                None,
2615            );
2616            assert_eq!(status, 403);
2617        });
2618    }
2619
2620    #[test]
2621    fn gdpr_export_returns_envelope_for_admin() {
2622        let admin = AuthContext::admin();
2623        with_ctx(true, &admin, |ctx| {
2624            let (status, body, _ct) = route(
2625                ctx,
2626                HttpMethod::Post,
2627                "/api/admin/users/alice/export",
2628                "",
2629                None,
2630            );
2631            assert_eq!(status, 200);
2632            let v: serde_json::Value = serde_json::from_str(&body).unwrap();
2633            assert_eq!(v["user_id"], "alice");
2634            assert!(v["entities"].is_object());
2635            assert!(v["exported_at"].is_string());
2636        });
2637    }
2638
2639    #[test]
2640    fn plugin_hooks_fire_on_entity_delete() {
2641        let admin = AuthContext::admin();
2642        let hooks = CountingHooks::new();
2643        with_ctx_hooks(true, &admin, &hooks, |ctx| {
2644            let (status, _body, _ct) = route(
2645                ctx,
2646                HttpMethod::Delete,
2647                "/api/entities/User/stub-id",
2648                "",
2649                None,
2650            );
2651            assert_eq!(status, 200);
2652        });
2653        assert_eq!(hooks.before_delete_calls.load(Ordering::SeqCst), 1);
2654    }
2655
2656    // -----------------------------------------------------------------------
2657    // Webhook endpoint — /api/webhooks/:action
2658    //
2659    // No functions are registered in the stub RouterContext, so we can only
2660    // assert the error paths (FUNCTIONS_NOT_AVAILABLE, FN_NOT_FOUND). The
2661    // happy-path with request-context propagation is covered by the TS
2662    // runtime tests + a manual integration check against a live server.
2663    // -----------------------------------------------------------------------
2664
2665    #[test]
2666    fn webhook_returns_503_when_functions_unavailable() {
2667        let anon = AuthContext::anonymous();
2668        with_ctx(true, &anon, |ctx| {
2669            let (status, body, _ct) = route(
2670                ctx,
2671                HttpMethod::Post,
2672                "/api/webhooks/stripe_handler",
2673                "{}",
2674                None,
2675            );
2676            assert_eq!(status, 503);
2677            assert!(body.contains("FUNCTIONS_NOT_AVAILABLE"));
2678        });
2679    }
2680
2681    #[test]
2682    fn webhook_accepts_any_http_method() {
2683        // Regression: providers send GET challenge requests (e.g. Slack URL
2684        // verification). The endpoint must route regardless of method.
2685        let anon = AuthContext::anonymous();
2686        with_ctx(true, &anon, |ctx| {
2687            for method in [
2688                HttpMethod::Get,
2689                HttpMethod::Post,
2690                HttpMethod::Put,
2691                HttpMethod::Patch,
2692                HttpMethod::Delete,
2693            ] {
2694                let (status, _body, _ct) = route(ctx, method, "/api/webhooks/any_name", "", None);
2695                // Without function runtime it's 503 regardless; the point
2696                // is that we didn't 405 Method Not Allowed.
2697                assert_ne!(status, 405);
2698            }
2699        });
2700    }
2701
2702    // -----------------------------------------------------------------------
2703    // Auth matrix regression scaffold
2704    //
2705    // Goal: catch reviewer-class bugs at PR time. Every route in this
2706    // table is hit as anonymous, guest, and authed-non-admin. The
2707    // EXPECTED column is what the route should return for that
2708    // identity. Adding a new route? Add a row. Discovering a bypass?
2709    // Add the regression here so it stays caught.
2710    //
2711    // Status semantics:
2712    //   401 = AUTH_REQUIRED            (require_auth)
2713    //   403 = FORBIDDEN | POLICY_DENIED (require_admin / policy)
2714    //   any = 200..=599 means "anything but 405" — for routes that
2715    //         legitimately reach the handler with this identity and
2716    //         may 200/400/etc. depending on body.
2717    // -----------------------------------------------------------------------
2718
2719    #[derive(Clone, Copy)]
2720    enum Expect {
2721        /// Status must equal this value.
2722        Eq(u16),
2723        /// Status must be 401 or 403 (rejected before handler logic).
2724        Rejected,
2725        /// Anything except 405 — handler reached, body validation may
2726        /// have triggered other errors.
2727        ReachedHandler,
2728    }
2729
2730    fn assert_expect(actual: u16, want: Expect, label: &str) {
2731        match want {
2732            Expect::Eq(s) => assert_eq!(actual, s, "{label}: expected status {s}, got {actual}"),
2733            Expect::Rejected => assert!(
2734                actual == 401 || actual == 403,
2735                "{label}: expected 401 or 403, got {actual}"
2736            ),
2737            Expect::ReachedHandler => assert_ne!(
2738                actual, 405,
2739                "{label}: route should accept this method, got 405"
2740            ),
2741        }
2742    }
2743
2744    /// Hit a route as anon / guest / authed-non-admin and assert each
2745    /// identity gets the documented response. Catches reviewer-class
2746    /// bugs (e.g. a P1 finding that an endpoint is_admin-gated drifts
2747    /// to public during a refactor).
2748    fn matrix_check(
2749        method: HttpMethod,
2750        url: &str,
2751        body: &str,
2752        expect_anon: Expect,
2753        expect_guest: Expect,
2754        expect_user: Expect,
2755    ) {
2756        let anon = AuthContext::anonymous();
2757        let guest = AuthContext::guest("guest-1".into());
2758        let user = AuthContext::authenticated("u-1".into());
2759
2760        for (auth, want, who) in [
2761            (&anon, expect_anon, "anon"),
2762            (&guest, expect_guest, "guest"),
2763            (&user, expect_user, "user"),
2764        ] {
2765            with_ctx(false, auth, |ctx| {
2766                let (status, _body, _ct) = route(ctx, method, url, body, None);
2767                assert_expect(status, want, &format!("{who} {method:?} {url}"));
2768            });
2769        }
2770    }
2771
2772    /// Like matrix_check, but the test scaffold loads a manifest with
2773    /// a deny-by-default policy on the named entity. Use this for
2774    /// policy-gated routes (cursor, filtered query, CRDT push) where
2775    /// the gate's job is "call check_entity_*" — without a denying
2776    /// policy in scope the call would silently pass.
2777    fn matrix_check_with_deny_policy(
2778        deny_entity: &str,
2779        method: HttpMethod,
2780        url: &str,
2781        body: &str,
2782        expect_anon: Expect,
2783        expect_guest: Expect,
2784        expect_user: Expect,
2785    ) {
2786        use pylon_kernel::{AppManifest, ManifestPolicy, MANIFEST_VERSION};
2787        let anon = AuthContext::anonymous();
2788        let guest = AuthContext::guest("guest-1".into());
2789        let user = AuthContext::authenticated("u-1".into());
2790
2791        let manifest = AppManifest {
2792            manifest_version: MANIFEST_VERSION,
2793            name: "test".into(),
2794            version: "0.1.0".into(),
2795            entities: vec![],
2796            routes: vec![],
2797            queries: vec![],
2798            actions: vec![],
2799            policies: vec![ManifestPolicy {
2800                name: "denyAll".into(),
2801                entity: Some(deny_entity.into()),
2802                allow_read: Some("false".into()),
2803                allow_update: Some("false".into()),
2804                ..Default::default()
2805            }],
2806            auth: Default::default(),
2807        };
2808        let store = StubDataStore {
2809            manifest: manifest.clone(),
2810        };
2811        let session_store = SessionStore::new();
2812        let magic_codes = MagicCodeStore::new();
2813        let oauth_state = OAuthStateStore::new();
2814        let account_store = pylon_auth::AccountStore::new();
2815        let api_keys = pylon_auth::api_key::ApiKeyStore::new();
2816        let orgs = pylon_auth::org::OrgStore::new();
2817        let siwe = pylon_auth::siwe::NonceStore::new();
2818        let phone_codes = pylon_auth::phone::PhoneCodeStore::new();
2819        let passkeys = pylon_auth::webauthn::PasskeyStore::new();
2820        let verification = pylon_auth::verification::VerificationStore::new();
2821        let audit = pylon_auth::audit::AuditStore::new();
2822        let policy_engine = PolicyEngine::from_manifest(&manifest);
2823        let change_log = ChangeLog::new();
2824        let notifier = NoopNotifier;
2825        let rooms = StubRooms;
2826        let cache = StubCache;
2827        let pubsub = StubPubSub;
2828        let jobs = StubJobs;
2829        let scheduler = StubScheduler;
2830        let workflows = StubWorkflows;
2831        let files = StubFiles;
2832        let openapi = StubOpenApi;
2833        let email = StubEmail;
2834        let cookie_config = CookieConfig::from_env(&CookieConfig::default_name_for("test"));
2835
2836        for (auth, want, who) in [
2837            (&anon, expect_anon, "anon"),
2838            (&guest, expect_guest, "guest"),
2839            (&user, expect_user, "user"),
2840        ] {
2841            let ctx = RouterContext {
2842                store: &store,
2843                session_store: &session_store,
2844                magic_codes: &magic_codes,
2845                oauth_state: &oauth_state,
2846                account_store: &account_store,
2847                api_keys: &api_keys,
2848                orgs: &orgs,
2849                siwe: &siwe,
2850                phone_codes: &phone_codes,
2851                passkeys: &passkeys,
2852                verification: &verification,
2853                audit: &audit,
2854                trusted_origins: &[],
2855                policy_engine: &policy_engine,
2856                change_log: &change_log,
2857                notifier: &notifier,
2858                rooms: &rooms,
2859                cache: &cache,
2860                pubsub: &pubsub,
2861                jobs: &jobs,
2862                scheduler: &scheduler,
2863                workflows: &workflows,
2864                files: &files,
2865                openapi: &openapi,
2866                functions: None,
2867                email: &email,
2868                shards: None,
2869                plugin_hooks: &NoopPluginHooks,
2870                auth_ctx: auth,
2871                is_dev: false,
2872                request_headers: &[],
2873                peer_ip: "127.0.0.1",
2874                cookie_config: &cookie_config,
2875                response_headers: RefCell::new(Vec::new()),
2876            };
2877            let (status, _body, _ct) = route(&ctx, method, url, body, None);
2878            assert_expect(status, want, &format!("{who} {method:?} {url}"));
2879        }
2880    }
2881
2882    #[test]
2883    fn matrix_cache_admin_only() {
2884        matrix_check(
2885            HttpMethod::Get,
2886            "/api/cache/anykey",
2887            "",
2888            Expect::Rejected,
2889            Expect::Rejected,
2890            Expect::Rejected,
2891        );
2892        matrix_check(
2893            HttpMethod::Post,
2894            "/api/cache",
2895            r#"{"op":"get","key":"x"}"#,
2896            Expect::Rejected,
2897            Expect::Rejected,
2898            Expect::Rejected,
2899        );
2900        matrix_check(
2901            HttpMethod::Delete,
2902            "/api/cache/anykey",
2903            "",
2904            Expect::Rejected,
2905            Expect::Rejected,
2906            Expect::Rejected,
2907        );
2908    }
2909
2910    #[test]
2911    fn matrix_pubsub_admin_only() {
2912        matrix_check(
2913            HttpMethod::Post,
2914            "/api/pubsub/publish",
2915            r#"{"channel":"x","message":"y"}"#,
2916            Expect::Rejected,
2917            Expect::Rejected,
2918            Expect::Rejected,
2919        );
2920        matrix_check(
2921            HttpMethod::Get,
2922            "/api/pubsub/channels",
2923            "",
2924            Expect::Rejected,
2925            Expect::Rejected,
2926            Expect::Rejected,
2927        );
2928        matrix_check(
2929            HttpMethod::Get,
2930            "/api/pubsub/history/some-channel",
2931            "",
2932            Expect::Rejected,
2933            Expect::Rejected,
2934            Expect::Rejected,
2935        );
2936    }
2937
2938    #[test]
2939    fn matrix_jobs_read_admin_only() {
2940        matrix_check(
2941            HttpMethod::Get,
2942            "/api/jobs/stats",
2943            "",
2944            Expect::Rejected,
2945            Expect::Rejected,
2946            Expect::Rejected,
2947        );
2948        matrix_check(
2949            HttpMethod::Get,
2950            "/api/jobs/dead",
2951            "",
2952            Expect::Rejected,
2953            Expect::Rejected,
2954            Expect::Rejected,
2955        );
2956        matrix_check(
2957            HttpMethod::Get,
2958            "/api/jobs",
2959            "",
2960            Expect::Rejected,
2961            Expect::Rejected,
2962            Expect::Rejected,
2963        );
2964        matrix_check(
2965            HttpMethod::Get,
2966            "/api/jobs/some-job-id",
2967            "",
2968            Expect::Rejected,
2969            Expect::Rejected,
2970            Expect::Rejected,
2971        );
2972    }
2973
2974    #[test]
2975    fn matrix_workflows_read_admin_only() {
2976        matrix_check(
2977            HttpMethod::Get,
2978            "/api/workflows/definitions",
2979            "",
2980            Expect::Rejected,
2981            Expect::Rejected,
2982            Expect::Rejected,
2983        );
2984        matrix_check(
2985            HttpMethod::Get,
2986            "/api/workflows",
2987            "",
2988            Expect::Rejected,
2989            Expect::Rejected,
2990            Expect::Rejected,
2991        );
2992        matrix_check(
2993            HttpMethod::Get,
2994            "/api/workflows/some-id",
2995            "",
2996            Expect::Rejected,
2997            Expect::Rejected,
2998            Expect::Rejected,
2999        );
3000    }
3001
3002    #[test]
3003    fn matrix_files_download_requires_auth() {
3004        // Anon must not enumerate uploads via predictable file IDs.
3005        // Guest + user can — files use require_auth, not require_admin.
3006        matrix_check(
3007            HttpMethod::Get,
3008            "/api/files/some-file-id",
3009            "",
3010            Expect::Eq(401),
3011            Expect::ReachedHandler,
3012            Expect::ReachedHandler,
3013        );
3014    }
3015
3016    #[test]
3017    fn matrix_crdt_push_respects_update_policy() {
3018        // Pre-fix: any session (incl. guest) could push a CRDT update
3019        // to any addressable row. Now: when the entity has an update
3020        // policy that denies, even authed non-admins are blocked. Anon
3021        // bounces at the require_auth gate before policy is consulted.
3022        matrix_check_with_deny_policy(
3023            "Doc",
3024            HttpMethod::Post,
3025            "/api/crdt/Doc/some-row",
3026            r#"{"update":"00"}"#,
3027            Expect::Eq(401),
3028            Expect::Rejected,
3029            Expect::Rejected,
3030        );
3031    }
3032
3033    #[test]
3034    fn matrix_filtered_query_respects_read_policy() {
3035        matrix_check_with_deny_policy(
3036            "Secret",
3037            HttpMethod::Post,
3038            "/api/query/Secret",
3039            r#"{"where":{}}"#,
3040            Expect::Rejected,
3041            Expect::Rejected,
3042            Expect::Rejected,
3043        );
3044    }
3045
3046    #[test]
3047    fn matrix_cursor_pagination_respects_read_policy() {
3048        matrix_check_with_deny_policy(
3049            "Secret",
3050            HttpMethod::Get,
3051            "/api/entities/Secret/cursor?limit=10",
3052            "",
3053            Expect::Rejected,
3054            Expect::Rejected,
3055            Expect::Rejected,
3056        );
3057    }
3058
3059    /// One-shot audit of every admin-required GET route. Every entry
3060    /// here is a route where an anonymous, guest, or authed-non-admin
3061    /// caller MUST receive 401/403. Adding a new admin GET? Add a row.
3062    /// Removing a route's admin gate? You'll see this test fail in the
3063    /// PR diff and can confirm the change is intentional.
3064    ///
3065    /// This is the forcing function the 2nd security review asked for:
3066    /// "every `/api/*` GET that doesn't return admin/non-sensitive
3067    /// data has an auth gate before the handler". Compile-time
3068    /// enumeration would be nicer but the route list lives in
3069    /// `route_inner` as a chain of if-blocks; until that's reified
3070    /// into data, this table is the gate.
3071    #[test]
3072    fn matrix_admin_get_routes_audit() {
3073        let admin_get_routes: &[(&str, &str)] = &[
3074            ("/api/scheduler", "list scheduled tasks"),
3075            ("/api/fn", "enumerate registered functions"),
3076            ("/api/fn/traces", "function execution traces"),
3077            ("/api/shards", "shard topology + subscriber counts"),
3078            ("/api/cache/anykey", "raw cache read"),
3079            ("/api/pubsub/channels", "list pub/sub channels"),
3080            ("/api/pubsub/history/anychannel", "channel retained history"),
3081            ("/api/jobs/stats", "job queue stats"),
3082            ("/api/jobs/dead", "dead-letter queue"),
3083            ("/api/jobs", "job list with payloads"),
3084            ("/api/jobs/some-id", "single job detail"),
3085            ("/api/workflows/definitions", "workflow definitions"),
3086            ("/api/workflows", "workflow instance list"),
3087            ("/api/workflows/some-id", "workflow instance detail"),
3088        ];
3089        for (url, label) in admin_get_routes {
3090            matrix_check(
3091                HttpMethod::Get,
3092                url,
3093                "",
3094                Expect::Rejected,
3095                Expect::Rejected,
3096                Expect::Rejected,
3097            );
3098            // re-assert with explicit fail message so the audit log
3099            // pinpoints which row regressed.
3100            let _ = label;
3101        }
3102    }
3103
3104    // Keep the warning silencer until this is used.
3105    #[allow(dead_code)]
3106    const _TOUCH_ATOMIC_BOOL: AtomicBool = AtomicBool::new(false);
3107
3108    // -----------------------------------------------------------------------
3109    // Round-3 hardening tests
3110    // -----------------------------------------------------------------------
3111
3112    #[test]
3113    fn redact_email_keeps_two_chars_and_domain() {
3114        assert_eq!(super::redact_email("alice@acme.com"), "al***@acme.com");
3115        assert_eq!(super::redact_email("a@b.io"), "a***@b.io");
3116        assert_eq!(super::redact_email("ab@x.io"), "ab***@x.io");
3117        // Pathological inputs don't crash; just return a marker.
3118        assert_eq!(super::redact_email("not-an-email"), "***");
3119        assert_eq!(super::redact_email(""), "***");
3120        // Multi-byte chars in local-part don't slice mid-codepoint.
3121        assert_eq!(super::redact_email("éric@x.io"), "ér***@x.io");
3122    }
3123
3124    #[test]
3125    fn public_manifest_strips_policy_expressions() {
3126        use pylon_kernel::{AppManifest, ManifestPolicy, MANIFEST_VERSION};
3127        let m = AppManifest {
3128            manifest_version: MANIFEST_VERSION,
3129            name: "t".into(),
3130            version: "0.0.0".into(),
3131            entities: vec![],
3132            routes: vec![],
3133            queries: vec![],
3134            actions: vec![],
3135            policies: vec![ManifestPolicy {
3136                name: "ownerOnly".into(),
3137                entity: Some("Todo".into()),
3138                allow_read: Some("auth.userId == data.ownerId".into()),
3139                allow_update: Some("auth.userId == data.ownerId".into()),
3140                ..Default::default()
3141            }],
3142            auth: Default::default(),
3143        };
3144        let pub_m = super::public_manifest(&m);
3145        let p = &pub_m.policies[0];
3146        // Name + entity preserved so client tooling can map "denied
3147        // by ownerOnly" errors to the human label.
3148        assert_eq!(p.name, "ownerOnly");
3149        assert_eq!(p.entity.as_deref(), Some("Todo"));
3150        // Expressions stripped.
3151        assert_eq!(p.allow, "");
3152        assert!(p.allow_read.is_none());
3153        assert!(p.allow_update.is_none());
3154        // The full manifest still has them — sanity check the test
3155        // didn't accidentally mutate the input.
3156        assert_eq!(
3157            m.policies[0].allow_read.as_deref(),
3158            Some("auth.userId == data.ownerId")
3159        );
3160    }
3161}