Skip to main content

pylon_router/
lib.rs

1//! Platform-agnostic HTTP router for pylon.
2//!
3//! This crate contains the pure routing logic that maps HTTP requests to
4//! data store operations. It has no I/O dependencies — no `tiny_http`,
5//! no `tungstenite`, no `rusqlite`. It works with any [`DataStore`]
6//! implementation (SQLite Runtime, Cloudflare D1, etc.).
7
8use pylon_auth::{AuthContext, CookieConfig, MagicCodeStore, OAuthStateStore, SessionStore};
9use pylon_http::{DataError, DataStore, HttpMethod};
10use pylon_policy::PolicyEngine;
11use pylon_sync::{ChangeKind, ChangeLog};
12use std::cell::RefCell;
13
14mod routes;
15
16// ---------------------------------------------------------------------------
17// ChangeNotifier — abstraction over WS/SSE broadcast
18// ---------------------------------------------------------------------------
19
20/// Receives change notifications for real-time sync.
21///
22/// On the self-hosted server this broadcasts to WebSocket + SSE hubs.
23/// On Workers this can be a no-op or post to a Durable Object.
24pub trait ChangeNotifier: Send + Sync {
25    fn notify(&self, event: &pylon_sync::ChangeEvent);
26    fn notify_presence(&self, json: &str);
27
28    /// Ship a binary CRDT update for one row to subscribed clients.
29    /// Called after every successful write to a CRDT-mode entity. The
30    /// payload is a Loro snapshot (or eventually an incremental delta);
31    /// the implementation owns wire-format framing — see
32    /// `encode_crdt_frame` for the canonical Pylon shape.
33    ///
34    /// Default impl is a no-op so backends without WebSocket support
35    /// (Workers, no-op notifier) compile without ceremony.
36    fn notify_crdt(&self, _entity: &str, _row_id: &str, _snapshot: &[u8]) {}
37}
38
39/// No-op notifier for platforms without real-time push.
40pub struct NoopNotifier;
41
42impl ChangeNotifier for NoopNotifier {
43    fn notify(&self, _event: &pylon_sync::ChangeEvent) {}
44    fn notify_presence(&self, _json: &str) {}
45}
46
47// ---------------------------------------------------------------------------
48// CRDT wire format
49//
50// Every CRDT broadcast frame is a single binary WebSocket message shaped:
51//
52//   [type: u8] [entity_len: u16 BE] [entity utf8] [row_id_len: u16 BE] [row_id utf8] [payload bytes]
53//
54// Type bytes (matching the Remboard pattern that proved out in production):
55//
56//   0x10 = full Loro snapshot (sent on subscribe / first writes)
57//   0x11 = incremental Loro update (sent on subsequent writes)
58//
59// For the first slice we always send 0x10 — Loro's snapshots are bounded
60// by internal compaction so the bandwidth is fine; switching to deltas
61// is a non-breaking optimization (just flip the type byte and the
62// payload encoding) once we have per-client version-vector tracking.
63// ---------------------------------------------------------------------------
64
65/// Frame type for a full CRDT snapshot.
66pub const CRDT_FRAME_SNAPSHOT: u8 = 0x10;
67/// Frame type for an incremental CRDT update (reserved — not yet emitted).
68pub const CRDT_FRAME_UPDATE: u8 = 0x11;
69
70/// Errors from [`encode_crdt_frame`]. Surfaced loud rather than silently
71/// truncating so a pathological entity / row_id name (>64 KiB) becomes
72/// an observable failure instead of a malformed frame the client can't
73/// decode.
74#[derive(Debug, Clone, PartialEq, Eq)]
75pub enum FrameEncodeError {
76    /// Entity name exceeds the 16-bit length header. In practice every
77    /// Pylon entity name is well under 100 bytes — hitting this means
78    /// the caller is using the encoder for something it wasn't designed
79    /// for. The bound is `u16::MAX = 65535` bytes (UTF-8 length).
80    EntityTooLong { len: usize },
81    /// Row ID exceeds the 16-bit length header. Pylon-generated IDs are
82    /// 40 hex chars; user-supplied IDs aren't validated up to this layer
83    /// but are practically bounded by URL / SQL constraints elsewhere.
84    RowIdTooLong { len: usize },
85}
86
87impl std::fmt::Display for FrameEncodeError {
88    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89        match self {
90            Self::EntityTooLong { len } => write!(
91                f,
92                "CRDT frame: entity name {len} bytes exceeds u16 length limit ({})",
93                u16::MAX
94            ),
95            Self::RowIdTooLong { len } => write!(
96                f,
97                "CRDT frame: row_id {len} bytes exceeds u16 length limit ({})",
98                u16::MAX
99            ),
100        }
101    }
102}
103
104impl std::error::Error for FrameEncodeError {}
105
106/// Encode a CRDT broadcast frame. Layout documented at the top of this
107/// module. Returns the encoded bytes on success; errors when entity /
108/// row_id can't fit the 16-bit length header (~65 KiB, never hit in
109/// practice).
110///
111/// Client decoders mirror this in `@pylonsync/loro/src/wire.ts`.
112pub fn encode_crdt_frame(
113    frame_type: u8,
114    entity: &str,
115    row_id: &str,
116    payload: &[u8],
117) -> Result<Vec<u8>, FrameEncodeError> {
118    let entity_bytes = entity.as_bytes();
119    let row_id_bytes = row_id.as_bytes();
120    if entity_bytes.len() > u16::MAX as usize {
121        return Err(FrameEncodeError::EntityTooLong {
122            len: entity_bytes.len(),
123        });
124    }
125    if row_id_bytes.len() > u16::MAX as usize {
126        return Err(FrameEncodeError::RowIdTooLong {
127            len: row_id_bytes.len(),
128        });
129    }
130    let entity_len = entity_bytes.len() as u16;
131    let row_id_len = row_id_bytes.len() as u16;
132    let mut out =
133        Vec::with_capacity(1 + 2 + entity_bytes.len() + 2 + row_id_bytes.len() + payload.len());
134    out.push(frame_type);
135    out.extend_from_slice(&entity_len.to_be_bytes());
136    out.extend_from_slice(entity_bytes);
137    out.extend_from_slice(&row_id_len.to_be_bytes());
138    out.extend_from_slice(row_id_bytes);
139    out.extend_from_slice(payload);
140    Ok(out)
141}
142
143#[cfg(test)]
144mod crdt_frame_tests {
145    use super::*;
146
147    #[test]
148    fn roundtrip_header_layout() {
149        let frame = encode_crdt_frame(
150            CRDT_FRAME_SNAPSHOT,
151            "Message",
152            "msg_123",
153            &[0xab, 0xcd, 0xef],
154        )
155        .unwrap();
156        assert_eq!(frame[0], 0x10);
157        // entity_len = 7 ("Message") in BE
158        assert_eq!(&frame[1..3], &[0, 7]);
159        assert_eq!(&frame[3..10], b"Message");
160        // row_id_len = 7 ("msg_123") in BE
161        assert_eq!(&frame[10..12], &[0, 7]);
162        assert_eq!(&frame[12..19], b"msg_123");
163        // payload trails after both headers
164        assert_eq!(&frame[19..], &[0xab, 0xcd, 0xef]);
165    }
166
167    #[test]
168    fn empty_payload_still_carries_headers() {
169        let frame = encode_crdt_frame(CRDT_FRAME_UPDATE, "X", "y", &[]).unwrap();
170        assert_eq!(frame[0], 0x11);
171        assert_eq!(&frame[1..3], &[0, 1]);
172        assert_eq!(&frame[3..4], b"X");
173        assert_eq!(&frame[4..6], &[0, 1]);
174        assert_eq!(&frame[6..7], b"y");
175        assert_eq!(frame.len(), 7);
176    }
177
178    #[test]
179    fn hex_roundtrip() {
180        assert_eq!(super::decode_hex(""), Some(vec![]));
181        assert_eq!(super::decode_hex("00"), Some(vec![0x00]));
182        assert_eq!(super::decode_hex("ab"), Some(vec![0xab]));
183        assert_eq!(super::decode_hex("AB"), Some(vec![0xab]));
184        assert_eq!(
185            super::decode_hex("DEADBEEF"),
186            Some(vec![0xde, 0xad, 0xbe, 0xef])
187        );
188    }
189
190    #[test]
191    fn hex_rejects_malformed() {
192        assert_eq!(super::decode_hex("a"), None); // odd length
193        assert_eq!(super::decode_hex("xy"), None); // non-hex
194        assert_eq!(super::decode_hex("ab cd"), None); // space inside
195    }
196
197    #[test]
198    fn entity_too_long_errors() {
199        let huge_entity = "x".repeat(u16::MAX as usize + 1);
200        let err = encode_crdt_frame(CRDT_FRAME_SNAPSHOT, &huge_entity, "y", &[])
201            .expect_err("entity > u16::MAX must reject");
202        assert!(matches!(err, FrameEncodeError::EntityTooLong { .. }));
203    }
204
205    #[test]
206    fn row_id_too_long_errors() {
207        let huge_row_id = "x".repeat(u16::MAX as usize + 1);
208        let err = encode_crdt_frame(CRDT_FRAME_SNAPSHOT, "X", &huge_row_id, &[])
209            .expect_err("row_id > u16::MAX must reject");
210        assert!(matches!(err, FrameEncodeError::RowIdTooLong { .. }));
211    }
212}
213
214// ---------------------------------------------------------------------------
215// CacheOps / PubSubOps / JobOps / SchedulerOps / WorkflowOps
216// — thin traits so the router doesn't depend on concrete impls
217// ---------------------------------------------------------------------------
218
219/// Cache operations used by the router.
220pub trait CacheOps: Send + Sync {
221    fn handle_command(&self, body: &str) -> (u16, String);
222    fn handle_get(&self, key: &str) -> (u16, String);
223    fn handle_delete(&self, key: &str) -> (u16, String);
224}
225
226/// Pub/Sub operations used by the router.
227pub trait PubSubOps: Send + Sync {
228    fn handle_publish(&self, body: &str) -> (u16, String);
229    fn handle_channels(&self) -> (u16, String);
230    fn handle_history(&self, channel: &str, url: &str) -> (u16, String);
231}
232
233/// Room operations used by the router.
234pub trait RoomOps: Send + Sync {
235    fn join(
236        &self,
237        room: &str,
238        user_id: &str,
239        data: Option<serde_json::Value>,
240    ) -> Result<(serde_json::Value, serde_json::Value), DataError>;
241    fn leave(&self, room: &str, user_id: &str) -> Option<serde_json::Value>;
242    fn set_presence(
243        &self,
244        room: &str,
245        user_id: &str,
246        data: serde_json::Value,
247    ) -> Option<serde_json::Value>;
248    fn broadcast(
249        &self,
250        room: &str,
251        sender: Option<&str>,
252        topic: &str,
253        data: serde_json::Value,
254    ) -> Option<serde_json::Value>;
255    fn list_rooms(&self) -> Vec<String>;
256    fn room_size(&self, name: &str) -> usize;
257    fn members(&self, name: &str) -> Vec<serde_json::Value>;
258}
259
260/// Job queue operations used by the router.
261pub trait JobOps: Send + Sync {
262    fn enqueue(
263        &self,
264        name: &str,
265        payload: serde_json::Value,
266        priority: &str,
267        delay_secs: u64,
268        max_retries: u32,
269        queue: &str,
270    ) -> String;
271    fn stats(&self) -> serde_json::Value;
272    fn dead_letters(&self) -> serde_json::Value;
273    fn retry_dead(&self, id: &str) -> bool;
274    fn list_jobs(
275        &self,
276        status: Option<&str>,
277        queue: Option<&str>,
278        limit: usize,
279    ) -> serde_json::Value;
280    fn get_job(&self, id: &str) -> Option<serde_json::Value>;
281}
282
283/// Scheduler operations used by the router.
284pub trait SchedulerOps: Send + Sync {
285    fn list_tasks(&self) -> serde_json::Value;
286    fn trigger(&self, name: &str) -> bool;
287}
288
289/// Workflow engine operations used by the router.
290pub trait WorkflowOps: Send + Sync {
291    fn definitions(&self) -> serde_json::Value;
292    fn start(&self, name: &str, input: serde_json::Value) -> Result<String, String>;
293    fn list(&self, status_filter: Option<&str>) -> serde_json::Value;
294    fn get(&self, id: &str) -> Option<serde_json::Value>;
295    fn advance(&self, id: &str) -> Result<String, String>;
296    fn send_event(&self, id: &str, event: &str, data: serde_json::Value) -> Result<(), String>;
297    fn cancel(&self, id: &str) -> Result<(), String>;
298}
299
300/// File storage operations used by the router.
301pub trait FileOps: Send + Sync {
302    fn upload(&self, body: &str) -> (u16, String);
303    fn get_file(&self, id: &str) -> (u16, String);
304}
305
306/// Sends emails (magic codes, invitations, etc.).
307pub trait EmailSender: Send + Sync {
308    fn send(&self, to: &str, subject: &str, body: &str) -> Result<(), String>;
309}
310
311/// Access to sharded real-time simulations (game matches, MMO zones, etc.).
312pub trait ShardOps: Send + Sync {
313    /// Look up an existing shard by ID.
314    fn get_shard(&self, id: &str) -> Option<std::sync::Arc<dyn pylon_realtime::DynShard>>;
315
316    /// List IDs of active shards.
317    fn list_shards(&self) -> Vec<String>;
318
319    /// Number of active shards.
320    fn shard_count(&self) -> usize;
321}
322
323/// Generates the OpenAPI spec JSON string for the manifest.
324pub trait OpenApiGenerator: Send + Sync {
325    fn generate(&self, base_url: &str) -> String;
326}
327
328/// Plugin CRUD lifecycle hooks used by the router.
329///
330/// Wired up so that `POST/PATCH/DELETE /api/entities/...` triggers registered
331/// plugin before_/after_ hooks (audit log, search indexing, webhooks,
332/// versioning, validation, timestamps, slugify). Previously the router only
333/// ran `on_request` + custom routes, silently bypassing CRUD hooks — which
334/// meant security-relevant plugins (validation, audit_log) didn't apply to
335/// the primary write path.
336///
337/// `before_insert/update` receive a mutable `data` so plugins can inject
338/// fields (timestamps, slugs). A returned `Err` rejects the write with the
339/// given status + message and no data is touched.
340pub trait PluginHookOps: Send + Sync {
341    fn before_insert(
342        &self,
343        entity: &str,
344        data: &mut serde_json::Value,
345        auth: &AuthContext,
346    ) -> Result<(), (u16, String, String)>;
347
348    fn after_insert(&self, entity: &str, id: &str, data: &serde_json::Value, auth: &AuthContext);
349
350    fn before_update(
351        &self,
352        entity: &str,
353        id: &str,
354        data: &mut serde_json::Value,
355        auth: &AuthContext,
356    ) -> Result<(), (u16, String, String)>;
357
358    fn after_update(&self, entity: &str, id: &str, data: &serde_json::Value, auth: &AuthContext);
359
360    fn before_delete(
361        &self,
362        entity: &str,
363        id: &str,
364        auth: &AuthContext,
365    ) -> Result<(), (u16, String, String)>;
366
367    fn after_delete(&self, entity: &str, id: &str, auth: &AuthContext);
368}
369
370/// No-op plugin hooks for platforms or tests without a registry.
371pub struct NoopPluginHooks;
372
373impl PluginHookOps for NoopPluginHooks {
374    fn before_insert(
375        &self,
376        _entity: &str,
377        _data: &mut serde_json::Value,
378        _auth: &AuthContext,
379    ) -> Result<(), (u16, String, String)> {
380        Ok(())
381    }
382    fn after_insert(
383        &self,
384        _entity: &str,
385        _id: &str,
386        _data: &serde_json::Value,
387        _auth: &AuthContext,
388    ) {
389    }
390    fn before_update(
391        &self,
392        _entity: &str,
393        _id: &str,
394        _data: &mut serde_json::Value,
395        _auth: &AuthContext,
396    ) -> Result<(), (u16, String, String)> {
397        Ok(())
398    }
399    fn after_update(
400        &self,
401        _entity: &str,
402        _id: &str,
403        _data: &serde_json::Value,
404        _auth: &AuthContext,
405    ) {
406    }
407    fn before_delete(
408        &self,
409        _entity: &str,
410        _id: &str,
411        _auth: &AuthContext,
412    ) -> Result<(), (u16, String, String)> {
413        Ok(())
414    }
415    fn after_delete(&self, _entity: &str, _id: &str, _auth: &AuthContext) {}
416}
417
418/// TypeScript function operations used by the router.
419///
420/// Implementations manage transaction semantics: mutations run under the
421/// write lock with BEGIN/COMMIT/ROLLBACK, queries use the read pool,
422/// actions run without transactions.
423pub trait FnOps: Send + Sync {
424    /// Look up a registered function.
425    fn get_fn(&self, name: &str) -> Option<pylon_functions::registry::FnDef>;
426
427    /// List all registered functions.
428    fn list_fns(&self) -> Vec<pylon_functions::registry::FnDef>;
429
430    /// Execute a function. For streaming responses, `on_stream` is called for
431    /// each chunk as it arrives from the function handler.
432    ///
433    /// `request` is populated when the function is invoked via a custom HTTP
434    /// route (`defineRoute` binding). It carries the raw request metadata
435    /// (method, path, headers, body bytes) so actions can verify webhook
436    /// signatures. Pass `None` for programmatic invocations (runAction,
437    /// scheduled jobs, admin dashboard).
438    fn call(
439        &self,
440        fn_name: &str,
441        args: serde_json::Value,
442        auth: pylon_functions::protocol::AuthInfo,
443        on_stream: Option<Box<dyn FnMut(&str) + Send>>,
444        request: Option<pylon_functions::protocol::RequestInfo>,
445    ) -> Result<
446        (serde_json::Value, pylon_functions::trace::FnTrace),
447        pylon_functions::runner::FnCallError,
448    >;
449
450    /// Recent traces for observability (newest first).
451    fn recent_traces(&self, limit: usize) -> Vec<pylon_functions::trace::FnTrace>;
452
453    /// Check whether the caller is allowed to invoke this function right now.
454    ///
455    /// Returns `Ok(())` if allowed, or `Err(retry_after_secs)` if the caller
456    /// is over the per-function quota. Default impl is permissive — backends
457    /// that don't enforce per-function limits don't need to implement it.
458    /// `identity` is a stable string for the caller (user id, session id, or
459    /// IP) used as the rate-limit key.
460    fn check_rate_limit(&self, _fn_name: &str, _identity: &str) -> Result<(), u64> {
461        Ok(())
462    }
463}
464
465// ---------------------------------------------------------------------------
466// RouterContext — bundles all dependencies for a single request
467// ---------------------------------------------------------------------------
468
469pub struct RouterContext<'a> {
470    pub store: &'a dyn DataStore,
471    pub session_store: &'a SessionStore,
472    pub magic_codes: &'a MagicCodeStore,
473    pub oauth_state: &'a OAuthStateStore,
474    /// Persistent OAuth account links — better-auth's `account` table
475    /// equivalent. Used by the OAuth callback to look up + upsert the
476    /// `(provider, provider_account_id) → user_id` mapping plus the
477    /// access/refresh token bundle.
478    pub account_store: &'a pylon_auth::AccountStore,
479    /// Long-lived API keys — `pk.key_<id>.<secret>` bearer tokens that
480    /// resolve to a user_id with optional scopes/expiry. Created via
481    /// `POST /api/auth/api-keys`, listed/revoked from the same path.
482    pub api_keys: &'a pylon_auth::api_key::ApiKeyStore,
483    /// Organizations + memberships + invites — multi-tenant team
484    /// management. Endpoints under `/api/auth/orgs/...`.
485    pub orgs: &'a pylon_auth::org::OrgStore,
486    /// Per-address pending SIWE nonces. Issued at
487    /// `/api/auth/siwe/nonce`, consumed at `/api/auth/siwe/verify`.
488    pub siwe: &'a pylon_auth::siwe::NonceStore,
489    /// Phone-number magic codes. Endpoints under `/api/auth/phone/...`.
490    pub phone_codes: &'a pylon_auth::phone::PhoneCodeStore,
491    /// WebAuthn / passkey credentials + per-user challenge stash.
492    /// Endpoints under `/api/auth/passkey/...`.
493    pub passkeys: &'a pylon_auth::webauthn::PasskeyStore,
494    pub policy_engine: &'a PolicyEngine,
495    pub change_log: &'a ChangeLog,
496    pub notifier: &'a dyn ChangeNotifier,
497    pub rooms: &'a dyn RoomOps,
498    pub cache: &'a dyn CacheOps,
499    pub pubsub: &'a dyn PubSubOps,
500    pub jobs: &'a dyn JobOps,
501    pub scheduler: &'a dyn SchedulerOps,
502    pub workflows: &'a dyn WorkflowOps,
503    pub files: &'a dyn FileOps,
504    pub openapi: &'a dyn OpenApiGenerator,
505    pub functions: Option<&'a dyn FnOps>,
506    pub email: &'a dyn EmailSender,
507    pub shards: Option<&'a dyn ShardOps>,
508    pub plugin_hooks: &'a dyn PluginHookOps,
509    pub auth_ctx: &'a AuthContext,
510    /// Allowlist of origins (`scheme://host[:port]`) that the OAuth
511    /// start endpoint will accept as `?callback=` / `?error_callback=`
512    /// targets. Sourced from `PYLON_TRUSTED_ORIGINS` (comma-separated)
513    /// at server boot. Borrowed from better-auth's `trustedOrigins`
514    /// model — explicit allowlist, no implicit "same-origin trust" or
515    /// env-var magic. Open redirects via OAuth are an easy bug to
516    /// ship by accident; this list is the only thing standing between
517    /// a misconfigured frontend and an attacker-controlled redirect.
518    pub trusted_origins: &'a [String],
519    pub is_dev: bool,
520    /// Raw HTTP request headers (lowercased names). Used by the webhook
521    /// action endpoint to pass the exact signing-relevant headers through
522    /// to TypeScript actions. Empty slice on platforms that don't forward
523    /// headers (e.g. internal calls).
524    pub request_headers: &'a [(String, String)],
525    /// Client IP as the runtime resolved it from the socket. Used as
526    /// the rate-limit bucket key for unauthenticated callers — the
527    /// alternative ("anon" string) puts every unauth request worldwide
528    /// into one shared bucket, which lets one attacker starve every
529    /// other anonymous caller. Empty string on platforms that don't
530    /// expose a peer address.
531    pub peer_ip: &'a str,
532    /// Session cookie shape (name, domain, attrs). Handlers use this to
533    /// emit Set-Cookie headers via [`RouterContext::add_response_header`]
534    /// when they want a browser-bound session.
535    pub cookie_config: &'a CookieConfig,
536    /// Extra response headers handlers want to attach (e.g. Set-Cookie,
537    /// Location). The runtime drains this after `route()` returns and
538    /// merges them into the outgoing response. Interior mutability so
539    /// handlers don't need a `&mut` ctx.
540    pub response_headers: RefCell<Vec<(String, String)>>,
541}
542
543impl<'a> RouterContext<'a> {
544    /// Queue a header to be added to the response built from this request.
545    pub fn add_response_header(&self, name: impl Into<String>, value: impl Into<String>) {
546        self.response_headers
547            .borrow_mut()
548            .push((name.into(), value.into()));
549    }
550
551    /// Drain the queued response headers. Runtime calls this once after
552    /// `route()` returns, before constructing the wire response.
553    pub fn take_response_headers(&self) -> Vec<(String, String)> {
554        std::mem::take(&mut *self.response_headers.borrow_mut())
555    }
556
557    /// Read the request's `Origin` header, if any. Browsers always send
558    /// Origin on cross-origin XHR/fetch and on POSTs; non-browser
559    /// callers (CLI, server-to-server) typically don't.
560    pub fn request_origin(&self) -> Option<&str> {
561        self.request_headers
562            .iter()
563            .find(|(k, _)| k.eq_ignore_ascii_case("origin"))
564            .map(|(_, v)| v.as_str())
565    }
566
567    /// Emit a session cookie when the request looks like it came from a
568    /// browser (i.e. carries Origin). Non-browser callers still receive
569    /// the JSON token in the body and ignore the missing cookie.
570    /// Origin allowlisting is enforced at the runtime CSRF layer for
571    /// state-changing methods, so handlers don't need to re-check here.
572    pub fn maybe_set_session_cookie(&self, token: &str) {
573        if self.request_origin().is_some() {
574            self.add_response_header("Set-Cookie", self.cookie_config.set_value(token));
575        }
576    }
577}
578
579// ---------------------------------------------------------------------------
580// OAuth callback shared logic (POST returns JSON, GET 302s with cookie)
581// ---------------------------------------------------------------------------
582
583pub(crate) struct OAuthError {
584    pub(crate) status: u16,
585    pub(crate) code: &'static str,
586    pub(crate) message: String,
587}
588
589/// Shared OAuth code-for-session exchange. Returns the user_id + minted
590/// Truncate (and elide) error strings before they end up in
591/// `oauth_error_message` redirect URLs. Provider error bodies can be
592/// huge or contain echoed-back request fields — keep the redirect
593/// short and safe to log.
594///
595/// MAX is the budget for the *output*, including the ellipsis (3
596/// bytes), so the slice itself caps at MAX-3.
597fn truncate_for_redirect(s: &str) -> String {
598    const MAX: usize = 240;
599    if s.len() <= MAX {
600        return s.to_string();
601    }
602    let budget = MAX - "…".len();
603    let mut end = budget;
604    while end > 0 && !s.is_char_boundary(end) {
605        end -= 1;
606    }
607    format!("{}…", &s[..end])
608}
609
610/// session, or a structured error suitable for both JSON (POST) and
611/// 302-redirect-with-error-param (GET) responses.
612/// Complete an OAuth login. Caller must have ALREADY validated the
613/// state token via `ctx.oauth_state.validate(...)` and is passing the
614/// resulting `OAuthState` record in via... well, by virtue of having
615/// called this function. State validation lives at the call site (not
616/// here) because the GET /api/auth/callback/:provider handler needs
617/// the validated record's callback URLs to know where to redirect on
618/// both success and failure — and validate is single-use, so it can
619/// only be called once per token.
620pub(crate) fn complete_oauth_login_pkce(
621    ctx: &RouterContext,
622    provider: &str,
623    code: Option<&str>,
624    pkce_verifier: Option<&str>,
625    dev_email: Option<&str>,
626    dev_name: Option<&str>,
627) -> Result<(String, pylon_auth::Session), OAuthError> {
628    let (userinfo, tokens) = if let Some(code) = code {
629        let registry = pylon_auth::OAuthRegistry::shared();
630        let config = registry.get(provider).cloned().ok_or_else(|| OAuthError {
631            status: 404,
632            code: "PROVIDER_NOT_FOUND",
633            message: format!("OAuth provider \"{provider}\" not configured"),
634        })?;
635        let tokens = config
636            .exchange_code_full_pkce(code, pkce_verifier)
637            .map_err(|err| OAuthError {
638                status: 502,
639                code: "OAUTH_TOKEN_EXCHANGE_FAILED",
640                // Sanitize: providers like to echo back the request body
641                // on auth failures. The auth layer already redacts known
642                // sensitive fields, but cap the length to keep stray
643                // tokens out of redirect URLs.
644                message: truncate_for_redirect(&format!("token exchange failed: {err}")),
645            })?;
646        // Apple identity lives in id_token, not a userinfo endpoint.
647        // Pass both — the auth layer routes by spec.
648        let info = config
649            .fetch_userinfo_with_id_token(&tokens.access_token, tokens.id_token.as_deref())
650            .map_err(|err| OAuthError {
651                status: 502,
652                code: "OAUTH_TOKEN_EXCHANGE_FAILED",
653                message: truncate_for_redirect(&format!("userinfo fetch failed: {err}")),
654            })?;
655        (info, tokens)
656    } else if ctx.is_dev {
657        let email = dev_email.ok_or_else(|| OAuthError {
658            status: 400,
659            code: "MISSING_FIELD",
660            message: "OAuth callback requires `code` (or `email` in dev mode)".into(),
661        })?;
662        // Dev path needs a stable provider_account_id so repeat
663        // sign-ins land on the same Account row. Use the email itself
664        // — predictable for tests, and a real provider would never
665        // reuse an email as a sub.
666        let info = pylon_auth::UserInfo {
667            provider: provider.to_string(),
668            provider_account_id: format!("dev:{email}"),
669            email: email.to_string(),
670            name: dev_name.map(String::from),
671        };
672        let tokens = pylon_auth::TokenSet {
673            access_token: "dev_access_token".into(),
674            refresh_token: None,
675            id_token: None,
676            expires_at: None,
677            scope: None,
678        };
679        (info, tokens)
680    } else {
681        return Err(OAuthError {
682            status: 400,
683            code: "MISSING_FIELD",
684            message: "OAuth callback requires an authorization `code` from the provider".into(),
685        });
686    };
687
688    // Real-world bug this replaces: the previous formatter produced
689    // strings like "1761811234Z" (epoch-seconds with a stray Z) that
690    // SQLite happily stored as TEXT but PostgreSQL rejected as
691    // invalid TIMESTAMPTZ — every Google sign-up against pylon-cloud
692    // failed with USER_CREATE_FAILED. Use the kernel's ISO 8601
693    // formatter for a value both backends parse cleanly.
694    let now = chrono_now_iso();
695
696    // Resolve user_id in priority order:
697    //   1. Existing account link by (provider, provider_account_id) — the
698    //      stable identity. Survives email changes on the provider side.
699    //   2. Existing User row by email — account-linking-by-email. The
700    //      classic "you signed up with email/password and now you're
701    //      adding Google" flow.
702    //   3. Create a new User.
703    //
704    // Crucially: every step that can fail (store.insert, store.update)
705    // returns its error rather than silently using the email as user_id.
706    // That swallow caused the "session for nonexistent user" bug — the
707    // OAuth flow looked successful but the User row was never created
708    // and /api/auth/me would resolve to a phantom identity.
709    let user_id = if let Some(existing) = ctx
710        .account_store
711        .find_by_provider(provider, &userinfo.provider_account_id)
712    {
713        // Returning user via the same provider — refresh the token
714        // bundle and reuse the linked user_id.
715        let mut refreshed = pylon_auth::Account::new(existing.user_id.clone(), &userinfo, &tokens);
716        refreshed.created_at = existing.created_at;
717        ctx.account_store.upsert(&refreshed);
718        existing.user_id
719    } else if let Ok(Some(row)) = ctx.store.lookup(
720        &ctx.store.manifest().auth.user.entity,
721        "email",
722        &userinfo.email,
723    ) {
724        // First-time link of this provider to an existing user (matched
725        // by email). Stamp emailVerified opportunistically since the
726        // provider just vouched for the address.
727        let id = row["id"].as_str().unwrap_or("").to_string();
728        if id.is_empty() {
729            return Err(OAuthError {
730                status: 500,
731                code: "USER_LOOKUP_INVALID",
732                message: "User row matched by email but had no id field".into(),
733            });
734        }
735        if row.get("emailVerified").map_or(true, |v| v.is_null()) {
736            // Best-effort — schemas without the field silently drop the
737            // update. We do NOT bail on this error since the user
738            // already existed and OAuth still succeeded.
739            let _ = ctx.store.update(
740                &ctx.store.manifest().auth.user.entity,
741                &id,
742                &serde_json::json!({ "emailVerified": now }),
743            );
744        }
745        ctx.account_store
746            .upsert(&pylon_auth::Account::new(id.clone(), &userinfo, &tokens));
747        id
748    } else {
749        // Brand-new user. Create the User row + the Account link. Both
750        // fail loudly — a silent failure here is what produced the
751        // "session for nonexistent user" bug.
752        let display_name = userinfo.name.as_deref().unwrap_or(&userinfo.email);
753        let user_entity = ctx.store.manifest().auth.user.entity.clone();
754        let id = ctx
755            .store
756            .insert(
757                &user_entity,
758                &serde_json::json!({
759                    "email": userinfo.email,
760                    "displayName": display_name,
761                    "emailVerified": now,
762                    "createdAt": now,
763                }),
764            )
765            .map_err(|e| OAuthError {
766                status: 500,
767                code: "USER_CREATE_FAILED",
768                // Preserve the full upstream code/message — a failed insert
769                // is almost always "the User entity in your manifest has
770                // a field this OAuth handler doesn't set" (NOT NULL
771                // violation), and the operator needs to see exactly which
772                // column.
773                message: format!(
774                    "failed to create User row for OAuth signup ({}): {}",
775                    e.code, e.message
776                ),
777            })?;
778        ctx.account_store
779            .upsert(&pylon_auth::Account::new(id.clone(), &userinfo, &tokens));
780        id
781    };
782    let session = ctx.session_store.create(user_id.clone());
783    Ok((user_id, session))
784}
785
786/// Parse a `key=value&key=value` query string into a map. Uses
787/// `query_decode` (NOT form_decode) — RFC 3986 says `+` is a literal
788/// in URI query strings; only `application/x-www-form-urlencoded`
789/// bodies decode `+` as space. OAuth state tokens that happen to
790/// contain `+` (e.g. base64-with-padding) round-trip cleanly here.
791pub(crate) fn parse_query(q: &str) -> std::collections::HashMap<String, String> {
792    let mut out = std::collections::HashMap::new();
793    for pair in q.split('&') {
794        if pair.is_empty() {
795            continue;
796        }
797        let (k, v) = pair.split_once('=').unwrap_or((pair, ""));
798        out.insert(query_decode(k), query_decode(v));
799    }
800    out
801}
802
803/// Percent-decode a URI query-string segment. Treats `+` as a literal
804/// `+` character (per RFC 3986 §3.4) — the `+` → space convention
805/// only applies to `application/x-www-form-urlencoded` *bodies*, not
806/// to URI query strings. Inlined `percent_decode` because the form-
807/// body variant isn't used here.
808fn query_decode(s: &str) -> String {
809    percent_decode(s, false)
810}
811
812#[allow(dead_code)]
813fn percent_decode(s: &str, plus_is_space: bool) -> String {
814    let bytes = s.as_bytes();
815    let mut out = Vec::with_capacity(bytes.len());
816    let mut i = 0;
817    while i < bytes.len() {
818        match bytes[i] {
819            b'+' if plus_is_space => {
820                out.push(b' ');
821                i += 1;
822            }
823            b'%' if i + 2 < bytes.len() => {
824                let hex = std::str::from_utf8(&bytes[i + 1..i + 3]).unwrap_or("");
825                match u8::from_str_radix(hex, 16) {
826                    Ok(b) => {
827                        out.push(b);
828                        i += 3;
829                    }
830                    Err(_) => {
831                        out.push(bytes[i]);
832                        i += 1;
833                    }
834                }
835            }
836            b => {
837                out.push(b);
838                i += 1;
839            }
840        }
841    }
842    String::from_utf8_lossy(&out).into_owned()
843}
844
845/// Redact an email for logging — keeps the first two characters of
846/// the local-part + the domain, masks the rest. `alice@acme.com`
847/// becomes `al***@acme.com`. Compliance-friendly (no full PII in
848/// operator log aggregators) without losing all debuggability.
849pub(crate) fn redact_email(email: &str) -> String {
850    match email.find('@') {
851        Some(at) => {
852            let (user, domain) = email.split_at(at);
853            let prefix_len = user.len().min(2);
854            let prefix: String = user.chars().take(prefix_len).collect();
855            format!("{prefix}***{domain}")
856        }
857        None => "***".to_string(),
858    }
859}
860
861/// Build a redacted view of the manifest safe to serve to anonymous
862/// callers. Drops the body of every policy expression — `allow_read`,
863/// `allow_insert`, etc. — but keeps policy name + entity + action so
864/// client tooling can map a "policy denied: ownerReadTodos" error to
865/// the human label without seeing the raw rule.
866fn public_manifest(m: &pylon_kernel::AppManifest) -> pylon_kernel::AppManifest {
867    let mut out = m.clone();
868    for p in out.policies.iter_mut() {
869        p.allow = String::new();
870        p.allow_read = None;
871        p.allow_insert = None;
872        p.allow_update = None;
873        p.allow_delete = None;
874    }
875    out
876}
877
878pub(crate) fn url_encode(s: &str) -> String {
879    let mut out = String::with_capacity(s.len());
880    for b in s.bytes() {
881        match b {
882            b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => {
883                out.push(b as char)
884            }
885            _ => out.push_str(&format!("%{b:02X}")),
886        }
887    }
888    out
889}
890
891// ---------------------------------------------------------------------------
892// route() — the platform-agnostic request router
893// ---------------------------------------------------------------------------
894
895/// Route an HTTP request to the appropriate handler.
896///
897/// Returns `(status_code, response_body, content_type)`.
898pub fn route(
899    ctx: &RouterContext,
900    method: HttpMethod,
901    url: &str,
902    body: &str,
903    auth_token: Option<&str>,
904) -> (u16, String, &'static str) {
905    let (status, body) = route_inner(ctx, method, url, body, auth_token);
906    (status, body, "application/json")
907}
908
909fn route_inner(
910    ctx: &RouterContext,
911    method: HttpMethod,
912    url: &str,
913    body: &str,
914    auth_token: Option<&str>,
915) -> (u16, String) {
916    // CORS preflight
917    if method == HttpMethod::Options {
918        return (204, String::new());
919    }
920
921    // GET /api/manifest
922    // Public manifest. Clients need entity/field/route shapes to call
923    // the API, but they do NOT need raw policy expressions — those are
924    // server-enforcement details, and exposing them ("auth.userId ==
925    // data.ownerId") tells an attacker exactly which condition to
926    // satisfy. Strip allow_* expressions; keep policy NAMES so client
927    // tooling can still surface "denied by ownerReadTodos" errors.
928    // Admins get the full thing for tooling via ?full=1.
929    if url.starts_with("/api/manifest") && method == HttpMethod::Get {
930        let path = url.split('?').next().unwrap_or(url);
931        if path == "/api/manifest" {
932            let want_full = query_param(url, "full")
933                .map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
934                .unwrap_or(false);
935            let manifest = ctx.store.manifest();
936            let body = if want_full && ctx.auth_ctx.is_admin {
937                serde_json::to_string(manifest).unwrap_or_else(|_| "{}".into())
938            } else {
939                serde_json::to_string(&public_manifest(manifest)).unwrap_or_else(|_| "{}".into())
940            };
941            return (200, body);
942        }
943    }
944
945    // GET /api/openapi.json
946    if url == "/api/openapi.json" && method == HttpMethod::Get {
947        return (200, ctx.openapi.generate(""));
948    }
949
950    // -----------------------------------------------------------------------
951    // Auth routes — handled by crates/router/src/routes/auth.rs.
952    // /api/auth/* (sessions, OAuth, magic-link, password, email verify,
953    // /me, /providers, /sessions, refresh).
954    // -----------------------------------------------------------------------
955    if let Some(r) = routes::auth::handle(ctx, method, url, body, auth_token) {
956        return r;
957    }
958
959    // -----------------------------------------------------------------------
960    // Sync + GDPR — handled by crates/router/src/routes/sync.rs.
961    // /api/sync/{pull,push}, /api/admin/users/:id/{export,purge}.
962    // -----------------------------------------------------------------------
963    if let Some(r) = routes::sync::handle(ctx, method, url, body, auth_token) {
964        return r;
965    }
966
967    // -----------------------------------------------------------------------
968    // Rooms — handled by crates/router/src/routes/rooms.rs.
969    // /api/rooms/{join,leave,presence,broadcast}, /api/rooms[/<room>].
970    // -----------------------------------------------------------------------
971    if let Some(r) = routes::rooms::handle(ctx, method, url, body, auth_token) {
972        return r;
973    }
974
975    // -----------------------------------------------------------------------
976    // Link / Files / CRDT — handled by routes/{links,files,crdt}.rs.
977    // /api/{link,unlink}, /api/files/{upload,<id>}, /api/crdt/<entity>/<row>.
978    // -----------------------------------------------------------------------
979    if let Some(r) = routes::links::handle(ctx, method, url, body, auth_token) {
980        return r;
981    }
982    if let Some(r) = routes::files::handle(ctx, method, url, body, auth_token) {
983        return r;
984    }
985    if let Some(r) = routes::crdt::handle(ctx, method, url, body, auth_token) {
986        return r;
987    }
988
989    // -----------------------------------------------------------------------
990    // Queries / Actions / Admin data / Search — handled by:
991    //   routes/queries.rs   (transact, query/:e, lookup, aggregate, query)
992    //   routes/actions.rs   (/api/actions/<name>)
993    //   routes/admin_data.rs (export, import)
994    //   routes/search.rs    (/api/search/<entity>)
995    // -----------------------------------------------------------------------
996    if let Some(r) = routes::queries::handle(ctx, method, url, body, auth_token) {
997        return r;
998    }
999    if let Some(r) = routes::actions::handle(ctx, method, url, body, auth_token) {
1000        return r;
1001    }
1002    if let Some(r) = routes::admin_data::handle(ctx, method, url, body, auth_token) {
1003        return r;
1004    }
1005    if let Some(r) = routes::auth_admin::handle(ctx, method, url, body, auth_token) {
1006        return r;
1007    }
1008    if let Some(r) = routes::ops_admin::handle(ctx, method, url, body, auth_token) {
1009        return r;
1010    }
1011    if let Some(r) = routes::search::handle(ctx, method, url, body, auth_token) {
1012        return r;
1013    }
1014
1015    // -----------------------------------------------------------------------
1016    // Entity CRUD + cursor + batch — handled by routes/entities.rs.
1017    // /api/entities/<entity>[/<id>], /api/entities/<entity>/cursor,
1018    // /api/batch.
1019    // -----------------------------------------------------------------------
1020    if let Some(r) = routes::entities::handle(ctx, method, url, body, auth_token) {
1021        return r;
1022    }
1023
1024    // -----------------------------------------------------------------------
1025    // Infra / Functions / Shards / Workflows / AI — handled by:
1026    //   routes/infra.rs       (cache, pubsub, jobs, scheduler)
1027    //   routes/functions.rs   (/api/fn, /api/fn/traces, /api/webhooks/<>)
1028    //   routes/shards.rs      (/api/shards*)
1029    //   routes/workflows.rs   (/api/workflows*)
1030    //   routes/ai.rs          (/api/ai/complete shim — runtime owns stream)
1031    // -----------------------------------------------------------------------
1032    if let Some(r) = routes::infra::handle(ctx, method, url, body, auth_token) {
1033        return r;
1034    }
1035    if let Some(r) = routes::functions::handle(ctx, method, url, body, auth_token) {
1036        return r;
1037    }
1038    if let Some(r) = routes::shards::handle(ctx, method, url, body, auth_token) {
1039        return r;
1040    }
1041    if let Some(r) = routes::workflows::handle(ctx, method, url, body, auth_token) {
1042        return r;
1043    }
1044    if let Some(r) = routes::ai::handle(ctx, method, url, body, auth_token) {
1045        return r;
1046    }
1047
1048    // -----------------------------------------------------------------------
1049    // Fallback
1050    // -----------------------------------------------------------------------
1051
1052    (
1053        404,
1054        json_error_with_hint(
1055            "NOT_FOUND",
1056            &format!("No API route matches {url}"),
1057            "Available endpoints: /api/entities/<entity>, /api/actions/<name>, /api/query, /api/auth/*, /api/sync/*, /api/files/*, /api/cache, /api/pubsub/*, /api/jobs, /api/scheduler, /api/workflows, /api/ai/*, /studio",
1058        ),
1059    )
1060}
1061
1062// ---------------------------------------------------------------------------
1063// Entity CRUD helpers
1064// ---------------------------------------------------------------------------
1065
1066pub(crate) fn handle_list(store: &dyn DataStore, entity: &str, url: &str) -> (u16, String) {
1067    let limit: Option<usize> = url
1068        .split("limit=")
1069        .nth(1)
1070        .and_then(|s| s.split('&').next())
1071        .and_then(|s| s.parse().ok());
1072    let offset: usize = url
1073        .split("offset=")
1074        .nth(1)
1075        .and_then(|s| s.split('&').next())
1076        .and_then(|s| s.parse().ok())
1077        .unwrap_or(0);
1078
1079    // Push limit/offset down to SQL via query_filtered's $limit/$offset.
1080    // Skips full-table scans for large entities.
1081    let mut filter = serde_json::Map::new();
1082    if let Some(l) = limit {
1083        filter.insert("$limit".into(), serde_json::json!(l));
1084    }
1085    if offset > 0 {
1086        filter.insert("$offset".into(), serde_json::json!(offset));
1087    }
1088    let filter = serde_json::Value::Object(filter);
1089
1090    match store.query_filtered(entity, &filter) {
1091        Ok(rows) => {
1092            // For backwards compatibility we keep the response shape but
1093            // `total` here means "rows returned" not "total in table".
1094            // The cursor pagination endpoint at /api/entities/:e/cursor is
1095            // the right path for total counts at scale.
1096            let count = rows.len();
1097            (
1098                200,
1099                serde_json::json!({
1100                    "data": rows,
1101                    "count": count,
1102                    "offset": offset,
1103                    "limit": limit,
1104                })
1105                .to_string(),
1106            )
1107        }
1108        Err(e) => (400, json_error(&e.code, &e.message)),
1109    }
1110}
1111
1112pub(crate) fn handle_get(store: &dyn DataStore, entity: &str, id: &str) -> (u16, String) {
1113    match store.get_by_id(entity, id) {
1114        Ok(Some(row)) => (
1115            200,
1116            serde_json::to_string(&row).unwrap_or_else(|_| "{}".into()),
1117        ),
1118        Ok(None) => (
1119            404,
1120            json_error("NOT_FOUND", &format!("{entity} with id \"{id}\" not found")),
1121        ),
1122        Err(e) => (400, json_error(&e.code, &e.message)),
1123    }
1124}
1125
1126pub(crate) fn handle_insert(ctx: &RouterContext, entity: &str, body: &str) -> (u16, String) {
1127    let mut data: serde_json::Value = match serde_json::from_str(body) {
1128        Ok(v) => v,
1129        Err(e) => {
1130            return (
1131                400,
1132                json_error_safe(
1133                    "INVALID_JSON",
1134                    "Invalid request body",
1135                    &format!("Invalid JSON: {e}"),
1136                ),
1137            )
1138        }
1139    };
1140    // Run plugin `before_insert` hooks. Registered plugins (validation,
1141    // timestamps, slugify) mutate `data` here; a rejected hook aborts
1142    // the write with their status + error payload.
1143    if let Err((status, code, msg)) =
1144        ctx.plugin_hooks
1145            .before_insert(entity, &mut data, ctx.auth_ctx)
1146    {
1147        return (status, json_error(&code, &msg));
1148    }
1149    match ctx.store.insert(entity, &data) {
1150        Ok(id) => {
1151            let seq = ctx
1152                .change_log
1153                .append(entity, &id, ChangeKind::Insert, Some(data.clone()));
1154            broadcast_change_with_crdt(
1155                ctx.notifier,
1156                ctx.store,
1157                seq,
1158                entity,
1159                &id,
1160                ChangeKind::Insert,
1161                Some(&data),
1162            );
1163            ctx.plugin_hooks
1164                .after_insert(entity, &id, &data, ctx.auth_ctx);
1165            (201, serde_json::json!({"id": id}).to_string())
1166        }
1167        Err(e) => (400, json_error(&e.code, &e.message)),
1168    }
1169}
1170
1171pub(crate) fn handle_update(
1172    ctx: &RouterContext,
1173    entity: &str,
1174    id: &str,
1175    body: &str,
1176) -> (u16, String) {
1177    let mut data: serde_json::Value = match serde_json::from_str(body) {
1178        Ok(v) => v,
1179        Err(e) => {
1180            return (
1181                400,
1182                json_error_safe(
1183                    "INVALID_JSON",
1184                    "Invalid request body",
1185                    &format!("Invalid JSON: {e}"),
1186                ),
1187            )
1188        }
1189    };
1190    if let Err((status, code, msg)) =
1191        ctx.plugin_hooks
1192            .before_update(entity, id, &mut data, ctx.auth_ctx)
1193    {
1194        return (status, json_error(&code, &msg));
1195    }
1196    match ctx.store.update(entity, id, &data) {
1197        Ok(true) => {
1198            let seq = ctx
1199                .change_log
1200                .append(entity, id, ChangeKind::Update, Some(data.clone()));
1201            broadcast_change_with_crdt(
1202                ctx.notifier,
1203                ctx.store,
1204                seq,
1205                entity,
1206                id,
1207                ChangeKind::Update,
1208                Some(&data),
1209            );
1210            ctx.plugin_hooks
1211                .after_update(entity, id, &data, ctx.auth_ctx);
1212            (200, serde_json::json!({"updated": true}).to_string())
1213        }
1214        Ok(false) => (
1215            404,
1216            json_error("NOT_FOUND", &format!("{entity}/{id} not found")),
1217        ),
1218        Err(e) => (400, json_error(&e.code, &e.message)),
1219    }
1220}
1221
1222pub(crate) fn handle_delete(ctx: &RouterContext, entity: &str, id: &str) -> (u16, String) {
1223    if let Err((status, code, msg)) = ctx.plugin_hooks.before_delete(entity, id, ctx.auth_ctx) {
1224        return (status, json_error(&code, &msg));
1225    }
1226    match ctx.store.delete(entity, id) {
1227        Ok(true) => {
1228            let seq = ctx.change_log.append(entity, id, ChangeKind::Delete, None);
1229            broadcast_change(ctx.notifier, seq, entity, id, ChangeKind::Delete, None);
1230            ctx.plugin_hooks.after_delete(entity, id, ctx.auth_ctx);
1231            (200, serde_json::json!({"deleted": true}).to_string())
1232        }
1233        Ok(false) => (
1234            404,
1235            json_error("NOT_FOUND", &format!("{entity}/{id} not found")),
1236        ),
1237        Err(e) => (400, json_error(&e.code, &e.message)),
1238    }
1239}
1240
1241// ---------------------------------------------------------------------------
1242// Helpers
1243// ---------------------------------------------------------------------------
1244
1245pub(crate) fn broadcast_change(
1246    notifier: &dyn ChangeNotifier,
1247    seq: u64,
1248    entity: &str,
1249    row_id: &str,
1250    kind: ChangeKind,
1251    data: Option<&serde_json::Value>,
1252) {
1253    let event = pylon_sync::ChangeEvent {
1254        seq,
1255        entity: entity.to_string(),
1256        row_id: row_id.to_string(),
1257        kind,
1258        data: data.cloned(),
1259        timestamp: String::new(),
1260    };
1261    notifier.notify(&event);
1262}
1263
1264/// Convenience: emit BOTH the JSON change event AND the binary CRDT
1265/// snapshot frame after a successful insert/update on a CRDT-mode
1266/// entity. The CRDT snapshot is fetched via [`DataStore::crdt_snapshot`];
1267/// for `crdt: false` entities (the LWW opt-out) it returns `Ok(None)`
1268/// and we skip the binary broadcast cleanly.
1269///
1270/// Delete operations don't ship a CRDT frame — Loro doesn't have a
1271/// "row gone" concept; the JSON change event with `kind: "delete"` is
1272/// the canonical signal. Clients drop the LoroDoc on receipt.
1273pub fn broadcast_change_with_crdt(
1274    notifier: &dyn ChangeNotifier,
1275    store: &dyn DataStore,
1276    seq: u64,
1277    entity: &str,
1278    row_id: &str,
1279    kind: ChangeKind,
1280    data: Option<&serde_json::Value>,
1281) {
1282    broadcast_change(notifier, seq, entity, row_id, kind.clone(), data);
1283    if matches!(kind, ChangeKind::Delete) {
1284        return;
1285    }
1286    if let Ok(Some(snapshot)) = store.crdt_snapshot(entity, row_id) {
1287        notifier.notify_crdt(entity, row_id, &snapshot);
1288    }
1289}
1290
1291/// Tiny lowercase-hex decoder. Returns `None` on any malformed input
1292/// (odd length, non-hex character). Used by the CRDT push endpoint to
1293/// turn the JSON `{update: "<hex>"}` payload back into binary Loro
1294/// bytes without pulling in a base64 dep just for one route.
1295pub(crate) fn decode_hex(s: &str) -> Option<Vec<u8>> {
1296    if s.len() % 2 != 0 {
1297        return None;
1298    }
1299    let mut out = Vec::with_capacity(s.len() / 2);
1300    let bytes = s.as_bytes();
1301    let mut i = 0;
1302    while i < bytes.len() {
1303        let hi = hex_nibble(bytes[i])?;
1304        let lo = hex_nibble(bytes[i + 1])?;
1305        out.push((hi << 4) | lo);
1306        i += 2;
1307    }
1308    Some(out)
1309}
1310
1311fn hex_nibble(b: u8) -> Option<u8> {
1312    match b {
1313        b'0'..=b'9' => Some(b - b'0'),
1314        b'a'..=b'f' => Some(b - b'a' + 10),
1315        b'A'..=b'F' => Some(b - b'A' + 10),
1316        _ => None,
1317    }
1318}
1319
1320pub fn json_error(code: &str, message: &str) -> String {
1321    serde_json::json!({"error": {"code": code, "message": message}}).to_string()
1322}
1323
1324/// Conventional field names that link a row to a user. Used by the GDPR
1325/// export/purge endpoints to discover referencing rows without requiring
1326/// app authors to annotate their schema. Apps that store user references
1327/// under non-conventional names must supply their own purge hook — the
1328/// default won't find them, and GDPR-compliant deletes require a full
1329/// sweep. We log a warning on export when entities appear to reference
1330/// users through custom names not in this list.
1331const USER_REF_FIELDS: &[&str] = &[
1332    "userId",
1333    "user_id",
1334    "authorId",
1335    "author_id",
1336    "ownerId",
1337    "owner_id",
1338    "createdBy",
1339    "created_by",
1340];
1341
1342/// Build a data-subject export for `user_id`. Returns a JSON envelope with
1343/// every row referencing the user across every manifest entity, plus the
1344/// User row itself when present. Format is stable — clients can diff
1345/// exports across time to see what's changed.
1346pub(crate) fn gdpr_export(ctx: &RouterContext, user_id: &str) -> (u16, String) {
1347    let manifest = ctx.store.manifest();
1348    let mut entities = serde_json::Map::new();
1349
1350    // The User row itself (if the schema has a User entity).
1351    if let Ok(Some(user_row)) = ctx.store.get_by_id("User", user_id) {
1352        entities.insert("User".to_string(), serde_json::json!([user_row]));
1353    }
1354
1355    // Every entity that has a user-ref field: list all rows matching.
1356    for ent in &manifest.entities {
1357        if ent.name == "User" {
1358            continue; // Already captured above.
1359        }
1360        let user_field = ent
1361            .fields
1362            .iter()
1363            .find(|f| USER_REF_FIELDS.contains(&f.name.as_str()));
1364        let Some(field) = user_field else { continue };
1365        let filter = serde_json::json!({ &field.name: user_id });
1366        match ctx.store.query_filtered(&ent.name, &filter) {
1367            Ok(rows) if !rows.is_empty() => {
1368                entities.insert(ent.name.clone(), serde_json::Value::Array(rows));
1369            }
1370            Ok(_) => {}
1371            Err(e) => {
1372                tracing::warn!("[gdpr] export: query {} failed: {}", ent.name, e.message);
1373            }
1374        }
1375    }
1376
1377    let envelope = serde_json::json!({
1378        "user_id": user_id,
1379        "exported_at": pylon_kernel::util::now_iso(),
1380        "entities": entities,
1381    });
1382    (200, envelope.to_string())
1383}
1384
1385/// Hard-delete every row tied to `user_id` and revoke all active sessions.
1386/// Best-effort: a per-entity error is logged and counted but does not abort
1387/// the sweep — partial success is better than leaving half the footprint.
1388pub(crate) fn gdpr_purge(ctx: &RouterContext, user_id: &str) -> (u16, String) {
1389    let manifest = ctx.store.manifest();
1390    let mut deleted: u64 = 0;
1391    let mut errors: Vec<String> = Vec::new();
1392
1393    // Delete the User row itself first.
1394    if let Ok(true) = ctx.store.delete("User", user_id) {
1395        deleted += 1;
1396        // Synthetic change event so sync clients notice.
1397        let seq = ctx
1398            .change_log
1399            .append("User", user_id, ChangeKind::Delete, None);
1400        broadcast_change(ctx.notifier, seq, "User", user_id, ChangeKind::Delete, None);
1401    }
1402
1403    // Referencing rows.
1404    for ent in &manifest.entities {
1405        if ent.name == "User" {
1406            continue;
1407        }
1408        let Some(field) = ent
1409            .fields
1410            .iter()
1411            .find(|f| USER_REF_FIELDS.contains(&f.name.as_str()))
1412        else {
1413            continue;
1414        };
1415        let filter = serde_json::json!({ &field.name: user_id });
1416        let rows = match ctx.store.query_filtered(&ent.name, &filter) {
1417            Ok(r) => r,
1418            Err(e) => {
1419                errors.push(format!("query {}: {}", ent.name, e.message));
1420                continue;
1421            }
1422        };
1423        for row in rows {
1424            let Some(id) = row.get("id").and_then(|v| v.as_str()) else {
1425                continue;
1426            };
1427            match ctx.store.delete(&ent.name, id) {
1428                Ok(true) => {
1429                    deleted += 1;
1430                    let seq = ctx
1431                        .change_log
1432                        .append(&ent.name, id, ChangeKind::Delete, None);
1433                    broadcast_change(ctx.notifier, seq, &ent.name, id, ChangeKind::Delete, None);
1434                }
1435                Ok(false) => {}
1436                Err(e) => errors.push(format!("delete {}/{}: {}", ent.name, id, e.message)),
1437            }
1438        }
1439    }
1440
1441    // Invalidate every active session for the user. Even after the row is
1442    // gone, an in-flight session token would let a purged user keep acting.
1443    let revoked = ctx.session_store.revoke_all_for_user(user_id);
1444
1445    let resp = serde_json::json!({
1446        "user_id": user_id,
1447        "rows_deleted": deleted,
1448        "sessions_revoked": revoked,
1449        "errors": errors,
1450        "purged_at": pylon_kernel::util::now_iso(),
1451    });
1452    (200, resp.to_string())
1453}
1454
1455/// Gate a route behind admin auth. Returns `Some(error_response)` if the
1456/// caller is NOT admin, `None` if they are. Use at the top of any control-
1457/// plane handler (jobs, workflows, sync push, etc) that shouldn't be open
1458/// to arbitrary clients.
1459pub(crate) fn require_admin(ctx: &RouterContext) -> Option<(u16, String)> {
1460    if ctx.auth_ctx.is_admin {
1461        None
1462    } else {
1463        Some((
1464            403,
1465            json_error(
1466                "FORBIDDEN",
1467                "this endpoint requires admin auth (PYLON_ADMIN_TOKEN)",
1468            ),
1469        ))
1470    }
1471}
1472
1473/// Gate a route behind "any authenticated identity". Returns `Some(err)` when
1474/// the caller has neither a user session nor an admin token. Used for the
1475/// rooms API, which previously let unauthenticated clients enumerate rooms
1476/// and read membership rosters — a silent presence-data leak.
1477pub(crate) fn require_auth(ctx: &RouterContext) -> Option<(u16, String)> {
1478    if ctx.auth_ctx.is_admin || ctx.auth_ctx.user_id.is_some() {
1479        None
1480    } else {
1481        Some((
1482            401,
1483            json_error("AUTH_REQUIRED", "authenticated session required"),
1484        ))
1485    }
1486}
1487
1488pub fn json_error_with_hint(code: &str, message: &str, hint: &str) -> String {
1489    serde_json::json!({"error": {"code": code, "message": message, "hint": hint}}).to_string()
1490}
1491
1492pub fn json_error_safe(code: &str, user_message: &str, internal: &str) -> String {
1493    tracing::warn!("[error] {code}: {internal}");
1494    json_error(code, user_message)
1495}
1496
1497/// Parse a JSON request body, returning a 400 error tuple on failure.
1498pub(crate) fn parse_json(body: &str) -> Result<serde_json::Value, (u16, String)> {
1499    serde_json::from_str(body).map_err(|e| {
1500        (
1501            400,
1502            json_error_safe(
1503                "INVALID_JSON",
1504                "Invalid request body",
1505                &format!("Invalid JSON: {e}"),
1506            ),
1507        )
1508    })
1509}
1510
1511/// Extract a query parameter value from a URL string.
1512pub(crate) fn query_param<'a>(url: &'a str, key: &str) -> Option<&'a str> {
1513    let search = format!("{key}=");
1514    url.split(&search).nth(1).and_then(|s| s.split('&').next())
1515}
1516
1517pub(crate) fn chrono_now_iso() -> String {
1518    pylon_kernel::util::now_iso()
1519}
1520
1521// ---------------------------------------------------------------------------
1522// Integration tests — auth-bypass regressions
1523// ---------------------------------------------------------------------------
1524//
1525// These tests lock in the auth-gate fixes from the security review so future
1526// changes can't silently re-introduce them. Each test builds a minimal
1527// RouterContext with stub implementations of the service traits and exercises
1528// a previously-vulnerable route.
1529
1530#[cfg(test)]
1531mod auth_gate_tests {
1532    use super::*;
1533    use pylon_auth::{AuthContext, CookieConfig, MagicCodeStore, OAuthStateStore, SessionStore};
1534    use pylon_kernel::{AppManifest, MANIFEST_VERSION};
1535    use pylon_policy::PolicyEngine;
1536    use pylon_sync::ChangeLog;
1537
1538    // -----------------------------------------------------------------------
1539    // Minimal stubs for every service trait so we can build a RouterContext
1540    // without wiring up a real runtime.
1541    // -----------------------------------------------------------------------
1542
1543    struct StubDataStore {
1544        manifest: AppManifest,
1545    }
1546    impl pylon_http::DataStore for StubDataStore {
1547        fn manifest(&self) -> &AppManifest {
1548            &self.manifest
1549        }
1550        fn insert(
1551            &self,
1552            _entity: &str,
1553            _data: &serde_json::Value,
1554        ) -> Result<String, pylon_http::DataError> {
1555            Ok("stub-id".to_string())
1556        }
1557        fn get_by_id(
1558            &self,
1559            _entity: &str,
1560            _id: &str,
1561        ) -> Result<Option<serde_json::Value>, pylon_http::DataError> {
1562            Ok(None)
1563        }
1564        fn list(&self, _entity: &str) -> Result<Vec<serde_json::Value>, pylon_http::DataError> {
1565            Ok(Vec::new())
1566        }
1567        fn list_after(
1568            &self,
1569            _entity: &str,
1570            _after: Option<&str>,
1571            _limit: usize,
1572        ) -> Result<Vec<serde_json::Value>, pylon_http::DataError> {
1573            Ok(Vec::new())
1574        }
1575        fn update(
1576            &self,
1577            _entity: &str,
1578            _id: &str,
1579            _data: &serde_json::Value,
1580        ) -> Result<bool, pylon_http::DataError> {
1581            Ok(true)
1582        }
1583        fn delete(&self, _entity: &str, _id: &str) -> Result<bool, pylon_http::DataError> {
1584            Ok(true)
1585        }
1586        fn lookup(
1587            &self,
1588            _entity: &str,
1589            _field: &str,
1590            _value: &str,
1591        ) -> Result<Option<serde_json::Value>, pylon_http::DataError> {
1592            Ok(None)
1593        }
1594        fn link(
1595            &self,
1596            _entity: &str,
1597            _id: &str,
1598            _relation: &str,
1599            _target_id: &str,
1600        ) -> Result<bool, pylon_http::DataError> {
1601            Ok(true)
1602        }
1603        fn unlink(
1604            &self,
1605            _entity: &str,
1606            _id: &str,
1607            _relation: &str,
1608        ) -> Result<bool, pylon_http::DataError> {
1609            Ok(true)
1610        }
1611        fn query_filtered(
1612            &self,
1613            _entity: &str,
1614            _filter: &serde_json::Value,
1615        ) -> Result<Vec<serde_json::Value>, pylon_http::DataError> {
1616            Ok(Vec::new())
1617        }
1618        fn query_graph(
1619            &self,
1620            _query: &serde_json::Value,
1621        ) -> Result<serde_json::Value, pylon_http::DataError> {
1622            Ok(serde_json::json!({}))
1623        }
1624        fn transact(
1625            &self,
1626            _ops: &[serde_json::Value],
1627        ) -> Result<(bool, Vec<serde_json::Value>), pylon_http::DataError> {
1628            Ok((true, Vec::new()))
1629        }
1630    }
1631
1632    macro_rules! stub_ops {
1633        ($name:ident, $trait:path) => {
1634            struct $name;
1635        };
1636    }
1637
1638    stub_ops!(StubRooms, RoomOps);
1639    stub_ops!(StubCache, CacheOps);
1640    stub_ops!(StubPubSub, PubSubOps);
1641    stub_ops!(StubJobs, JobOps);
1642    stub_ops!(StubScheduler, SchedulerOps);
1643    stub_ops!(StubWorkflows, WorkflowOps);
1644    stub_ops!(StubFiles, FileOps);
1645    stub_ops!(StubOpenApi, OpenApiGenerator);
1646    stub_ops!(StubEmail, EmailSender);
1647
1648    impl RoomOps for StubRooms {
1649        fn join(
1650            &self,
1651            _room: &str,
1652            _user_id: &str,
1653            _data: Option<serde_json::Value>,
1654        ) -> Result<(serde_json::Value, serde_json::Value), pylon_http::DataError> {
1655            Ok((serde_json::json!({}), serde_json::json!({})))
1656        }
1657        fn leave(&self, _room: &str, _user_id: &str) -> Option<serde_json::Value> {
1658            None
1659        }
1660        fn set_presence(
1661            &self,
1662            _room: &str,
1663            _user_id: &str,
1664            _data: serde_json::Value,
1665        ) -> Option<serde_json::Value> {
1666            None
1667        }
1668        fn broadcast(
1669            &self,
1670            _room: &str,
1671            _sender: Option<&str>,
1672            _topic: &str,
1673            _data: serde_json::Value,
1674        ) -> Option<serde_json::Value> {
1675            None
1676        }
1677        fn list_rooms(&self) -> Vec<String> {
1678            vec![]
1679        }
1680        fn room_size(&self, _name: &str) -> usize {
1681            0
1682        }
1683        fn members(&self, _name: &str) -> Vec<serde_json::Value> {
1684            vec![]
1685        }
1686    }
1687    impl CacheOps for StubCache {
1688        fn handle_command(&self, _body: &str) -> (u16, String) {
1689            (200, "{}".into())
1690        }
1691        fn handle_get(&self, _key: &str) -> (u16, String) {
1692            (404, "{}".into())
1693        }
1694        fn handle_delete(&self, _key: &str) -> (u16, String) {
1695            (200, "{}".into())
1696        }
1697    }
1698    impl PubSubOps for StubPubSub {
1699        fn handle_publish(&self, _body: &str) -> (u16, String) {
1700            (200, "{}".into())
1701        }
1702        fn handle_channels(&self) -> (u16, String) {
1703            (200, "[]".into())
1704        }
1705        fn handle_history(&self, _channel: &str, _url: &str) -> (u16, String) {
1706            (200, "[]".into())
1707        }
1708    }
1709    impl JobOps for StubJobs {
1710        fn enqueue(
1711            &self,
1712            _name: &str,
1713            _payload: serde_json::Value,
1714            _priority: &str,
1715            _delay_secs: u64,
1716            _max_retries: u32,
1717            _queue: &str,
1718        ) -> String {
1719            "job-id".into()
1720        }
1721        fn stats(&self) -> serde_json::Value {
1722            serde_json::json!({})
1723        }
1724        fn dead_letters(&self) -> serde_json::Value {
1725            serde_json::json!([])
1726        }
1727        fn retry_dead(&self, _id: &str) -> bool {
1728            false
1729        }
1730        fn list_jobs(
1731            &self,
1732            _status: Option<&str>,
1733            _queue: Option<&str>,
1734            _limit: usize,
1735        ) -> serde_json::Value {
1736            serde_json::json!([])
1737        }
1738        fn get_job(&self, _id: &str) -> Option<serde_json::Value> {
1739            None
1740        }
1741    }
1742    impl SchedulerOps for StubScheduler {
1743        fn list_tasks(&self) -> serde_json::Value {
1744            serde_json::json!([])
1745        }
1746        fn trigger(&self, _name: &str) -> bool {
1747            false
1748        }
1749    }
1750    impl WorkflowOps for StubWorkflows {
1751        fn definitions(&self) -> serde_json::Value {
1752            serde_json::json!([])
1753        }
1754        fn start(&self, _name: &str, _input: serde_json::Value) -> Result<String, String> {
1755            Ok("wf-id".into())
1756        }
1757        fn list(&self, _status: Option<&str>) -> serde_json::Value {
1758            serde_json::json!([])
1759        }
1760        fn get(&self, _id: &str) -> Option<serde_json::Value> {
1761            None
1762        }
1763        fn advance(&self, _id: &str) -> Result<String, String> {
1764            Ok("running".into())
1765        }
1766        fn send_event(
1767            &self,
1768            _id: &str,
1769            _event: &str,
1770            _data: serde_json::Value,
1771        ) -> Result<(), String> {
1772            Ok(())
1773        }
1774        fn cancel(&self, _id: &str) -> Result<(), String> {
1775            Ok(())
1776        }
1777    }
1778    impl FileOps for StubFiles {
1779        fn upload(&self, _body: &str) -> (u16, String) {
1780            (501, "{}".into())
1781        }
1782        fn get_file(&self, _id: &str) -> (u16, String) {
1783            (404, "{}".into())
1784        }
1785    }
1786    impl OpenApiGenerator for StubOpenApi {
1787        fn generate(&self, _base: &str) -> String {
1788            "{}".into()
1789        }
1790    }
1791    impl EmailSender for StubEmail {
1792        fn send(&self, _to: &str, _subject: &str, _body: &str) -> Result<(), String> {
1793            Ok(())
1794        }
1795    }
1796
1797    fn empty_manifest() -> AppManifest {
1798        AppManifest {
1799            manifest_version: MANIFEST_VERSION,
1800            name: "test".into(),
1801            version: "0.1.0".into(),
1802            entities: vec![],
1803            routes: vec![],
1804            queries: vec![],
1805            actions: vec![],
1806            policies: vec![],
1807            auth: Default::default(),
1808        }
1809    }
1810
1811    /// Scaffold a RouterContext for tests. Caller chooses is_dev + auth.
1812    fn with_ctx<F>(is_dev: bool, auth: &AuthContext, f: F)
1813    where
1814        F: FnOnce(&RouterContext),
1815    {
1816        with_ctx_hooks(is_dev, auth, &NoopPluginHooks, f);
1817    }
1818
1819    fn with_ctx_hooks<F>(is_dev: bool, auth: &AuthContext, hooks: &dyn PluginHookOps, f: F)
1820    where
1821        F: FnOnce(&RouterContext),
1822    {
1823        let manifest = empty_manifest();
1824        let store = StubDataStore {
1825            manifest: manifest.clone(),
1826        };
1827        let session_store = SessionStore::new();
1828        let magic_codes = MagicCodeStore::new();
1829        let oauth_state = OAuthStateStore::new();
1830        let account_store = pylon_auth::AccountStore::new();
1831        let api_keys = pylon_auth::api_key::ApiKeyStore::new();
1832        let orgs = pylon_auth::org::OrgStore::new();
1833        let siwe = pylon_auth::siwe::NonceStore::new();
1834        let phone_codes = pylon_auth::phone::PhoneCodeStore::new();
1835        let passkeys = pylon_auth::webauthn::PasskeyStore::new();
1836        let policy_engine = PolicyEngine::from_manifest(&manifest);
1837        let change_log = ChangeLog::new();
1838        let notifier = NoopNotifier;
1839        let rooms = StubRooms;
1840        let cache = StubCache;
1841        let pubsub = StubPubSub;
1842        let jobs = StubJobs;
1843        let scheduler = StubScheduler;
1844        let workflows = StubWorkflows;
1845        let files = StubFiles;
1846        let openapi = StubOpenApi;
1847        let email = StubEmail;
1848        let cookie_config = CookieConfig::from_env(&CookieConfig::default_name_for("test"));
1849
1850        let ctx = RouterContext {
1851            store: &store,
1852            session_store: &session_store,
1853            magic_codes: &magic_codes,
1854            oauth_state: &oauth_state,
1855            account_store: &account_store,
1856            api_keys: &api_keys,
1857            orgs: &orgs,
1858            siwe: &siwe,
1859            phone_codes: &phone_codes,
1860            passkeys: &passkeys,
1861            trusted_origins: &[],
1862            policy_engine: &policy_engine,
1863            change_log: &change_log,
1864            notifier: &notifier,
1865            rooms: &rooms,
1866            cache: &cache,
1867            pubsub: &pubsub,
1868            jobs: &jobs,
1869            scheduler: &scheduler,
1870            workflows: &workflows,
1871            files: &files,
1872            openapi: &openapi,
1873            functions: None,
1874            email: &email,
1875            shards: None,
1876            plugin_hooks: hooks,
1877            auth_ctx: auth,
1878            is_dev,
1879            request_headers: &[],
1880            peer_ip: "127.0.0.1",
1881            cookie_config: &cookie_config,
1882            response_headers: RefCell::new(Vec::new()),
1883        };
1884        f(&ctx);
1885    }
1886
1887    // -----------------------------------------------------------------------
1888    // Regression tests for the previously-vulnerable routes
1889    // -----------------------------------------------------------------------
1890
1891    /// Prior vuln: POST /api/auth/session accepted any user_id from anonymous
1892    /// callers and minted a valid session. Now: prod requires admin.
1893    #[test]
1894    fn auth_session_refuses_non_admin_in_prod() {
1895        let anon = AuthContext::anonymous();
1896        with_ctx(false, &anon, |ctx| {
1897            let (status, body, _ct) = route(
1898                ctx,
1899                HttpMethod::Post,
1900                "/api/auth/session",
1901                r#"{"user_id":"victim"}"#,
1902                None,
1903            );
1904            assert_eq!(status, 403);
1905            assert!(body.contains("FORBIDDEN"));
1906        });
1907    }
1908
1909    #[test]
1910    fn auth_session_allowed_for_admin_in_prod() {
1911        let admin = AuthContext::admin();
1912        with_ctx(false, &admin, |ctx| {
1913            let (status, _body, _ct) = route(
1914                ctx,
1915                HttpMethod::Post,
1916                "/api/auth/session",
1917                r#"{"user_id":"alice"}"#,
1918                None,
1919            );
1920            assert_eq!(status, 201);
1921        });
1922    }
1923
1924    /// Prior vuln: OAuth callback accepted `{email, state}` without a real
1925    /// authorization code, letting anyone mint a session for any email.
1926    /// Now: prod requires an authorization code.
1927    #[test]
1928    fn oauth_callback_refuses_missing_code_in_prod() {
1929        let anon = AuthContext::anonymous();
1930        with_ctx(false, &anon, |ctx| {
1931            // Mint a state token so the state check passes — the `code`
1932            // requirement is what must still stop us.
1933            let state = ctx
1934                .oauth_state
1935                .create("google", "https://app/cb", "https://app/cb");
1936            let body = format!(r#"{{"state":"{state}","email":"victim@example.com"}}"#);
1937            let (status, resp, _ct) = route(
1938                ctx,
1939                HttpMethod::Post,
1940                "/api/auth/callback/google",
1941                &body,
1942                None,
1943            );
1944            assert_eq!(status, 400);
1945            assert!(resp.contains("authorization") || resp.contains("code"));
1946        });
1947    }
1948
1949    /// Prior vuln: /api/sync/push had no auth check.
1950    #[test]
1951    fn sync_push_requires_admin() {
1952        let anon = AuthContext::anonymous();
1953        with_ctx(false, &anon, |ctx| {
1954            let (status, body, _ct) = route(
1955                ctx,
1956                HttpMethod::Post,
1957                "/api/sync/push",
1958                r#"{"changes":[]}"#,
1959                None,
1960            );
1961            assert_eq!(status, 403);
1962            assert!(body.contains("FORBIDDEN"));
1963        });
1964    }
1965
1966    /// Prior vuln: /api/transact had no auth check.
1967    #[test]
1968    fn transact_requires_admin() {
1969        let anon = AuthContext::anonymous();
1970        with_ctx(false, &anon, |ctx| {
1971            let (status, _body, _ct) = route(
1972                ctx,
1973                HttpMethod::Post,
1974                "/api/transact",
1975                r#"{"ops":[]}"#,
1976                None,
1977            );
1978            assert_eq!(status, 403);
1979        });
1980    }
1981
1982    /// Prior vuln: /api/workflows/start had no auth check.
1983    #[test]
1984    fn workflow_start_requires_admin() {
1985        let anon = AuthContext::anonymous();
1986        with_ctx(false, &anon, |ctx| {
1987            let (status, _body, _ct) = route(
1988                ctx,
1989                HttpMethod::Post,
1990                "/api/workflows/start",
1991                r#"{"name":"x"}"#,
1992                None,
1993            );
1994            assert_eq!(status, 403);
1995        });
1996    }
1997
1998    /// Prior vuln: /api/jobs enqueue was open.
1999    #[test]
2000    fn jobs_enqueue_requires_admin() {
2001        let anon = AuthContext::anonymous();
2002        with_ctx(false, &anon, |ctx| {
2003            let (status, _body, _ct) = route(
2004                ctx,
2005                HttpMethod::Post,
2006                "/api/jobs",
2007                r#"{"name":"x","payload":{}}"#,
2008                None,
2009            );
2010            assert_eq!(status, 403);
2011        });
2012    }
2013
2014    // -----------------------------------------------------------------------
2015    // Robustness / fuzz-style property tests
2016    //
2017    // The router is the one public entry point that takes arbitrary bytes
2018    // from the network. Any input must yield a response, not a panic.
2019    // These tests hammer the handlers with malformed bodies, weird paths,
2020    // and deeply-nested JSON. If any of them panic or loop, we catch it
2021    // here instead of in production.
2022    // -----------------------------------------------------------------------
2023
2024    fn assert_route_doesnt_panic(ctx: &RouterContext, method: HttpMethod, url: &str, body: &str) {
2025        // `route` is synchronous. A panic would abort the test thread; the
2026        // test harness would fail the whole test. So just call it — success
2027        // means no panic.
2028        let (_status, _body, _ct) = route(ctx, method, url, body, None);
2029    }
2030
2031    #[test]
2032    fn fuzz_malformed_json_bodies_never_panic() {
2033        let admin = AuthContext::admin();
2034        with_ctx(true, &admin, |ctx| {
2035            let samples = [
2036                "",
2037                "not json",
2038                "{",
2039                "}",
2040                "{\"",
2041                "{\"key\":",
2042                "[]",
2043                "null",
2044                "true",
2045                "\"string\"",
2046                "{\"changes\":\"not an array\"}",
2047                &format!("{{\"deeply\":{}}}", "{".repeat(1000)),
2048                "{\"unicode\":\"\\u0000\"}",
2049                "{\"numbers\":1e308}",
2050                "{\"negative\":-999999999999999}",
2051            ];
2052            for body in &samples {
2053                for url in &[
2054                    "/api/sync/push",
2055                    "/api/transact",
2056                    "/api/import",
2057                    "/api/batch",
2058                    "/api/jobs",
2059                    "/api/auth/session",
2060                    "/api/auth/magic/send",
2061                ] {
2062                    assert_route_doesnt_panic(ctx, HttpMethod::Post, url, body);
2063                }
2064            }
2065        });
2066    }
2067
2068    #[test]
2069    fn fuzz_weird_urls_never_panic() {
2070        let admin = AuthContext::admin();
2071        with_ctx(true, &admin, |ctx| {
2072            let samples = [
2073                "/",
2074                "/api",
2075                "/api/",
2076                "/api/entities/",
2077                "/api/entities//",
2078                "/api/entities/%00",
2079                "/api/entities/../escape",
2080                "/api/entities/User?garbage=\x01",
2081                "/api/entities/User?$limit=abc&$order=garbage",
2082                &format!("/api/entities/{}", "a".repeat(10_000)),
2083                "/api/fn/",
2084                "/api/fn/traces",
2085                "/api/shards/id/connect",
2086                "/api/workflows/definitions",
2087                "/api/workflows/nonexistent/advance",
2088                "/api/rooms/",
2089                "/api/rooms/%20",
2090            ];
2091            for url in &samples {
2092                assert_route_doesnt_panic(ctx, HttpMethod::Get, url, "");
2093                assert_route_doesnt_panic(ctx, HttpMethod::Post, url, "{}");
2094                assert_route_doesnt_panic(ctx, HttpMethod::Delete, url, "");
2095            }
2096        });
2097    }
2098
2099    #[test]
2100    fn fuzz_deeply_nested_json_dont_stack_overflow() {
2101        // serde_json has an internal recursion limit (default 128); confirm
2102        // depths beyond that return 400 rather than overflow the stack.
2103        let admin = AuthContext::admin();
2104        with_ctx(true, &admin, |ctx| {
2105            let depth = 300;
2106            let body = format!("{}{}", "[".repeat(depth), "]".repeat(depth),);
2107            let (status, _body, _ct) = route(ctx, HttpMethod::Post, "/api/sync/push", &body, None);
2108            // Serde may reject with 400, or the handler may accept and
2109            // treat as empty — either is fine. The key property: no panic.
2110            assert!(status >= 200 && status < 600);
2111        });
2112    }
2113
2114    #[test]
2115    fn fuzz_unusual_http_methods_gracefully() {
2116        let admin = AuthContext::admin();
2117        with_ctx(true, &admin, |ctx| {
2118            for method in [
2119                HttpMethod::Get,
2120                HttpMethod::Post,
2121                HttpMethod::Put,
2122                HttpMethod::Patch,
2123                HttpMethod::Delete,
2124                HttpMethod::Options,
2125                HttpMethod::Head,
2126            ] {
2127                let (_status, _body, _ct) = route(ctx, method, "/api/entities/User", "{}", None);
2128            }
2129        });
2130    }
2131
2132    // -----------------------------------------------------------------------
2133    // /api/auth/email/* — verify the auth gate, the rate limiter, and a
2134    // happy-path send→verify cycle. Uses a User-aware stub store because
2135    // the default StubDataStore returns None for every get_by_id.
2136    // -----------------------------------------------------------------------
2137
2138    /// Stub store that pretends User "u-1" exists with email
2139    /// "alice@example.com" and tracks update calls so we can assert the
2140    /// emailVerified field gets set.
2141    struct UserStubStore {
2142        manifest: AppManifest,
2143        last_update: std::sync::Mutex<Option<(String, String, serde_json::Value)>>,
2144    }
2145    impl pylon_http::DataStore for UserStubStore {
2146        fn manifest(&self) -> &AppManifest {
2147            &self.manifest
2148        }
2149        fn insert(
2150            &self,
2151            _e: &str,
2152            _d: &serde_json::Value,
2153        ) -> Result<String, pylon_http::DataError> {
2154            Ok("u-1".into())
2155        }
2156        fn get_by_id(
2157            &self,
2158            entity: &str,
2159            id: &str,
2160        ) -> Result<Option<serde_json::Value>, pylon_http::DataError> {
2161            if entity == "User" && id == "u-1" {
2162                return Ok(Some(serde_json::json!({
2163                    "id": "u-1",
2164                    "email": "alice@example.com",
2165                    "displayName": "Alice",
2166                })));
2167            }
2168            Ok(None)
2169        }
2170        fn list(&self, _e: &str) -> Result<Vec<serde_json::Value>, pylon_http::DataError> {
2171            Ok(vec![])
2172        }
2173        fn list_after(
2174            &self,
2175            _e: &str,
2176            _a: Option<&str>,
2177            _l: usize,
2178        ) -> Result<Vec<serde_json::Value>, pylon_http::DataError> {
2179            Ok(vec![])
2180        }
2181        fn update(
2182            &self,
2183            entity: &str,
2184            id: &str,
2185            data: &serde_json::Value,
2186        ) -> Result<bool, pylon_http::DataError> {
2187            *self.last_update.lock().unwrap() = Some((entity.into(), id.into(), data.clone()));
2188            Ok(true)
2189        }
2190        fn delete(&self, _e: &str, _i: &str) -> Result<bool, pylon_http::DataError> {
2191            Ok(true)
2192        }
2193        fn lookup(
2194            &self,
2195            _e: &str,
2196            _f: &str,
2197            _v: &str,
2198        ) -> Result<Option<serde_json::Value>, pylon_http::DataError> {
2199            Ok(None)
2200        }
2201        fn link(
2202            &self,
2203            _e: &str,
2204            _i: &str,
2205            _r: &str,
2206            _t: &str,
2207        ) -> Result<bool, pylon_http::DataError> {
2208            Ok(true)
2209        }
2210        fn unlink(&self, _e: &str, _i: &str, _r: &str) -> Result<bool, pylon_http::DataError> {
2211            Ok(true)
2212        }
2213        fn query_filtered(
2214            &self,
2215            _e: &str,
2216            _f: &serde_json::Value,
2217        ) -> Result<Vec<serde_json::Value>, pylon_http::DataError> {
2218            Ok(vec![])
2219        }
2220        fn query_graph(
2221            &self,
2222            _q: &serde_json::Value,
2223        ) -> Result<serde_json::Value, pylon_http::DataError> {
2224            Ok(serde_json::json!({}))
2225        }
2226        fn aggregate(
2227            &self,
2228            _e: &str,
2229            _s: &serde_json::Value,
2230        ) -> Result<serde_json::Value, pylon_http::DataError> {
2231            Ok(serde_json::json!({}))
2232        }
2233        fn transact(
2234            &self,
2235            _o: &[serde_json::Value],
2236        ) -> Result<(bool, Vec<serde_json::Value>), pylon_http::DataError> {
2237            Ok((true, vec![]))
2238        }
2239        fn search(
2240            &self,
2241            _e: &str,
2242            _q: &serde_json::Value,
2243        ) -> Result<serde_json::Value, pylon_http::DataError> {
2244            Ok(serde_json::json!({}))
2245        }
2246    }
2247
2248    /// Capture-the-email stub so we can assert the body the user would
2249    /// have received. Production wiring does this through an Resend /
2250    /// SES adapter; tests just want to read what got "sent".
2251    struct CaptureEmail {
2252        sent: std::sync::Mutex<Vec<(String, String, String)>>,
2253    }
2254    impl EmailSender for CaptureEmail {
2255        fn send(&self, to: &str, subject: &str, body: &str) -> Result<(), String> {
2256            self.sent
2257                .lock()
2258                .unwrap()
2259                .push((to.into(), subject.into(), body.into()));
2260            Ok(())
2261        }
2262    }
2263
2264    fn with_user_ctx<F>(is_dev: bool, auth: &AuthContext, f: F)
2265    where
2266        F: FnOnce(&RouterContext, &UserStubStore, &CaptureEmail, &MagicCodeStore),
2267    {
2268        let manifest = empty_manifest();
2269        let store = UserStubStore {
2270            manifest: manifest.clone(),
2271            last_update: std::sync::Mutex::new(None),
2272        };
2273        let session_store = SessionStore::new();
2274        let magic_codes = MagicCodeStore::new();
2275        let oauth_state = OAuthStateStore::new();
2276        let account_store = pylon_auth::AccountStore::new();
2277        let api_keys = pylon_auth::api_key::ApiKeyStore::new();
2278        let orgs = pylon_auth::org::OrgStore::new();
2279        let siwe = pylon_auth::siwe::NonceStore::new();
2280        let phone_codes = pylon_auth::phone::PhoneCodeStore::new();
2281        let passkeys = pylon_auth::webauthn::PasskeyStore::new();
2282        let policy_engine = PolicyEngine::from_manifest(&manifest);
2283        let change_log = ChangeLog::new();
2284        let notifier = NoopNotifier;
2285        let rooms = StubRooms;
2286        let cache = StubCache;
2287        let pubsub = StubPubSub;
2288        let jobs = StubJobs;
2289        let scheduler = StubScheduler;
2290        let workflows = StubWorkflows;
2291        let files = StubFiles;
2292        let openapi = StubOpenApi;
2293        let email = CaptureEmail {
2294            sent: std::sync::Mutex::new(vec![]),
2295        };
2296        let hooks = NoopPluginHooks;
2297        let cookie_config = CookieConfig::from_env(&CookieConfig::default_name_for("test"));
2298
2299        let ctx = RouterContext {
2300            store: &store,
2301            session_store: &session_store,
2302            magic_codes: &magic_codes,
2303            oauth_state: &oauth_state,
2304            account_store: &account_store,
2305            api_keys: &api_keys,
2306            orgs: &orgs,
2307            siwe: &siwe,
2308            phone_codes: &phone_codes,
2309            passkeys: &passkeys,
2310            trusted_origins: &[],
2311            policy_engine: &policy_engine,
2312            change_log: &change_log,
2313            notifier: &notifier,
2314            rooms: &rooms,
2315            cache: &cache,
2316            pubsub: &pubsub,
2317            jobs: &jobs,
2318            scheduler: &scheduler,
2319            workflows: &workflows,
2320            files: &files,
2321            openapi: &openapi,
2322            functions: None,
2323            email: &email,
2324            shards: None,
2325            plugin_hooks: &hooks,
2326            auth_ctx: auth,
2327            is_dev,
2328            request_headers: &[],
2329            peer_ip: "127.0.0.1",
2330            cookie_config: &cookie_config,
2331            response_headers: RefCell::new(Vec::new()),
2332        };
2333        f(&ctx, &store, &email, &magic_codes);
2334    }
2335
2336    #[test]
2337    fn email_send_verification_requires_auth() {
2338        let anon = AuthContext::anonymous();
2339        with_user_ctx(true, &anon, |ctx, _, _, _| {
2340            let (status, body, _) = route(
2341                ctx,
2342                HttpMethod::Post,
2343                "/api/auth/email/send-verification",
2344                "{}",
2345                None,
2346            );
2347            assert_eq!(status, 401);
2348            assert!(body.contains("UNAUTHORIZED"));
2349        });
2350    }
2351
2352    #[test]
2353    fn email_verify_requires_auth() {
2354        let anon = AuthContext::anonymous();
2355        with_user_ctx(true, &anon, |ctx, _, _, _| {
2356            let (status, body, _) = route(
2357                ctx,
2358                HttpMethod::Post,
2359                "/api/auth/email/verify",
2360                r#"{"code":"123456"}"#,
2361                None,
2362            );
2363            assert_eq!(status, 401);
2364            assert!(body.contains("UNAUTHORIZED"));
2365        });
2366    }
2367
2368    #[test]
2369    fn email_send_verification_uses_session_email_not_body() {
2370        // Caller is "u-1" (alice@example.com). Even if they put a
2371        // different email in the body, the code should be issued for
2372        // the SESSION's email — otherwise an authed caller could spam
2373        // codes to arbitrary addresses.
2374        let alice = AuthContext::authenticated("u-1".into());
2375        with_user_ctx(true, &alice, |ctx, _, email, _| {
2376            let (status, body, _) = route(
2377                ctx,
2378                HttpMethod::Post,
2379                "/api/auth/email/send-verification",
2380                r#"{"email":"victim@example.com"}"#,
2381                None,
2382            );
2383            assert_eq!(status, 200);
2384            // Dev mode echoes the code; verify the recipient is alice,
2385            // not the body's victim.
2386            let sent = email.sent.lock().unwrap();
2387            assert_eq!(sent.len(), 1);
2388            assert_eq!(sent[0].0, "alice@example.com");
2389            assert!(body.contains("alice@example.com"));
2390            assert!(!body.contains("victim@example.com"));
2391        });
2392    }
2393
2394    #[test]
2395    fn email_verify_happy_path_stamps_email_verified() {
2396        let alice = AuthContext::authenticated("u-1".into());
2397        with_user_ctx(true, &alice, |ctx, store, _, magic_codes| {
2398            // Pre-issue a code (skipping the send endpoint) so we test
2399            // verify in isolation.
2400            let code = magic_codes.try_create("alice@example.com").unwrap();
2401            let body = format!(r#"{{"code":"{code}"}}"#);
2402            let (status, resp, _) =
2403                route(ctx, HttpMethod::Post, "/api/auth/email/verify", &body, None);
2404            assert_eq!(status, 200);
2405            assert!(resp.contains("\"verified\":true"));
2406            // Update was attempted on User u-1 with emailVerified set.
2407            let last = store.last_update.lock().unwrap();
2408            let (entity, id, data) = last.as_ref().expect("update should have fired");
2409            assert_eq!(entity, "User");
2410            assert_eq!(id, "u-1");
2411            assert!(data.get("emailVerified").is_some());
2412        });
2413    }
2414
2415    #[test]
2416    fn email_verify_rejects_wrong_code() {
2417        let alice = AuthContext::authenticated("u-1".into());
2418        with_user_ctx(true, &alice, |ctx, store, _, magic_codes| {
2419            let _ = magic_codes.try_create("alice@example.com").unwrap();
2420            let (status, body, _) = route(
2421                ctx,
2422                HttpMethod::Post,
2423                "/api/auth/email/verify",
2424                r#"{"code":"999999"}"#,
2425                None,
2426            );
2427            assert_eq!(status, 401);
2428            assert!(body.contains("INVALID_CODE"));
2429            // No update should have happened.
2430            assert!(store.last_update.lock().unwrap().is_none());
2431        });
2432    }
2433
2434    /// Dev mode keeps the old permissive behaviour for local tooling.
2435    #[test]
2436    fn auth_session_allowed_in_dev_mode() {
2437        let anon = AuthContext::anonymous();
2438        with_ctx(true, &anon, |ctx| {
2439            let (status, _body, _ct) = route(
2440                ctx,
2441                HttpMethod::Post,
2442                "/api/auth/session",
2443                r#"{"user_id":"alice"}"#,
2444                None,
2445            );
2446            assert_eq!(status, 201);
2447        });
2448    }
2449
2450    // -----------------------------------------------------------------------
2451    // Plugin CRUD hook wiring — prior vuln: POST/PATCH/DELETE on
2452    // /api/entities/* bypassed the registered plugin before_/after_ hooks,
2453    // so validation/audit_log/webhooks/slugify/timestamps never saw data-
2454    // plane writes. These tests pin that the router now runs them.
2455    // -----------------------------------------------------------------------
2456
2457    use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
2458
2459    struct CountingHooks {
2460        before_insert_calls: AtomicU32,
2461        after_insert_calls: AtomicU32,
2462        before_delete_calls: AtomicU32,
2463        reject_on_entity: Option<&'static str>,
2464    }
2465
2466    impl CountingHooks {
2467        fn new() -> Self {
2468            Self {
2469                before_insert_calls: AtomicU32::new(0),
2470                after_insert_calls: AtomicU32::new(0),
2471                before_delete_calls: AtomicU32::new(0),
2472                reject_on_entity: None,
2473            }
2474        }
2475    }
2476
2477    impl PluginHookOps for CountingHooks {
2478        fn before_insert(
2479            &self,
2480            entity: &str,
2481            _data: &mut serde_json::Value,
2482            _auth: &AuthContext,
2483        ) -> Result<(), (u16, String, String)> {
2484            self.before_insert_calls.fetch_add(1, Ordering::SeqCst);
2485            if self.reject_on_entity == Some(entity) {
2486                return Err((422, "VALIDATION".into(), "rejected by plugin".into()));
2487            }
2488            Ok(())
2489        }
2490        fn after_insert(
2491            &self,
2492            _entity: &str,
2493            _id: &str,
2494            _data: &serde_json::Value,
2495            _auth: &AuthContext,
2496        ) {
2497            self.after_insert_calls.fetch_add(1, Ordering::SeqCst);
2498        }
2499        fn before_update(
2500            &self,
2501            _entity: &str,
2502            _id: &str,
2503            _data: &mut serde_json::Value,
2504            _auth: &AuthContext,
2505        ) -> Result<(), (u16, String, String)> {
2506            Ok(())
2507        }
2508        fn after_update(
2509            &self,
2510            _entity: &str,
2511            _id: &str,
2512            _data: &serde_json::Value,
2513            _auth: &AuthContext,
2514        ) {
2515        }
2516        fn before_delete(
2517            &self,
2518            _entity: &str,
2519            _id: &str,
2520            _auth: &AuthContext,
2521        ) -> Result<(), (u16, String, String)> {
2522            self.before_delete_calls.fetch_add(1, Ordering::SeqCst);
2523            Ok(())
2524        }
2525        fn after_delete(&self, _entity: &str, _id: &str, _auth: &AuthContext) {}
2526    }
2527
2528    #[test]
2529    fn plugin_hooks_fire_on_entity_post() {
2530        // StubDataStore::insert always succeeds with id="stub-id", so we
2531        // just assert the before_/after_ counters tick.
2532        let admin = AuthContext::admin();
2533        let hooks = CountingHooks::new();
2534        with_ctx_hooks(true, &admin, &hooks, |ctx| {
2535            let (status, _body, _ct) = route(
2536                ctx,
2537                HttpMethod::Post,
2538                "/api/entities/User",
2539                r#"{"email":"a@b"}"#,
2540                None,
2541            );
2542            assert_eq!(status, 201);
2543        });
2544        assert_eq!(hooks.before_insert_calls.load(Ordering::SeqCst), 1);
2545        assert_eq!(hooks.after_insert_calls.load(Ordering::SeqCst), 1);
2546    }
2547
2548    #[test]
2549    fn plugin_before_insert_rejection_short_circuits_write() {
2550        // If the plugin rejects, the store's insert must NOT be called and
2551        // the status must propagate.
2552        let admin = AuthContext::admin();
2553        let rejector = CountingHooks {
2554            reject_on_entity: Some("User"),
2555            ..CountingHooks::new()
2556        };
2557        with_ctx_hooks(true, &admin, &rejector, |ctx| {
2558            let (status, body, _ct) =
2559                route(ctx, HttpMethod::Post, "/api/entities/User", r#"{}"#, None);
2560            assert_eq!(status, 422);
2561            assert!(body.contains("VALIDATION"));
2562        });
2563        assert_eq!(rejector.before_insert_calls.load(Ordering::SeqCst), 1);
2564        // after_insert must NOT have been called when before_insert rejected.
2565        assert_eq!(rejector.after_insert_calls.load(Ordering::SeqCst), 0);
2566    }
2567
2568    // -----------------------------------------------------------------------
2569    // GDPR export + purge — pinned behavior so accidental breakage shows up
2570    // as a test failure instead of a compliance surprise.
2571    // -----------------------------------------------------------------------
2572
2573    #[test]
2574    fn gdpr_export_requires_admin() {
2575        let anon = AuthContext::anonymous();
2576        with_ctx(false, &anon, |ctx| {
2577            let (status, body, _ct) = route(
2578                ctx,
2579                HttpMethod::Post,
2580                "/api/admin/users/alice/export",
2581                "",
2582                None,
2583            );
2584            assert_eq!(status, 403);
2585            assert!(body.contains("FORBIDDEN"));
2586        });
2587    }
2588
2589    #[test]
2590    fn gdpr_purge_requires_admin() {
2591        let anon = AuthContext::anonymous();
2592        with_ctx(false, &anon, |ctx| {
2593            let (status, _body, _ct) = route(
2594                ctx,
2595                HttpMethod::Delete,
2596                "/api/admin/users/alice/purge",
2597                "",
2598                None,
2599            );
2600            assert_eq!(status, 403);
2601        });
2602    }
2603
2604    #[test]
2605    fn gdpr_export_returns_envelope_for_admin() {
2606        let admin = AuthContext::admin();
2607        with_ctx(true, &admin, |ctx| {
2608            let (status, body, _ct) = route(
2609                ctx,
2610                HttpMethod::Post,
2611                "/api/admin/users/alice/export",
2612                "",
2613                None,
2614            );
2615            assert_eq!(status, 200);
2616            let v: serde_json::Value = serde_json::from_str(&body).unwrap();
2617            assert_eq!(v["user_id"], "alice");
2618            assert!(v["entities"].is_object());
2619            assert!(v["exported_at"].is_string());
2620        });
2621    }
2622
2623    #[test]
2624    fn plugin_hooks_fire_on_entity_delete() {
2625        let admin = AuthContext::admin();
2626        let hooks = CountingHooks::new();
2627        with_ctx_hooks(true, &admin, &hooks, |ctx| {
2628            let (status, _body, _ct) = route(
2629                ctx,
2630                HttpMethod::Delete,
2631                "/api/entities/User/stub-id",
2632                "",
2633                None,
2634            );
2635            assert_eq!(status, 200);
2636        });
2637        assert_eq!(hooks.before_delete_calls.load(Ordering::SeqCst), 1);
2638    }
2639
2640    // -----------------------------------------------------------------------
2641    // Webhook endpoint — /api/webhooks/:action
2642    //
2643    // No functions are registered in the stub RouterContext, so we can only
2644    // assert the error paths (FUNCTIONS_NOT_AVAILABLE, FN_NOT_FOUND). The
2645    // happy-path with request-context propagation is covered by the TS
2646    // runtime tests + a manual integration check against a live server.
2647    // -----------------------------------------------------------------------
2648
2649    #[test]
2650    fn webhook_returns_503_when_functions_unavailable() {
2651        let anon = AuthContext::anonymous();
2652        with_ctx(true, &anon, |ctx| {
2653            let (status, body, _ct) = route(
2654                ctx,
2655                HttpMethod::Post,
2656                "/api/webhooks/stripe_handler",
2657                "{}",
2658                None,
2659            );
2660            assert_eq!(status, 503);
2661            assert!(body.contains("FUNCTIONS_NOT_AVAILABLE"));
2662        });
2663    }
2664
2665    #[test]
2666    fn webhook_accepts_any_http_method() {
2667        // Regression: providers send GET challenge requests (e.g. Slack URL
2668        // verification). The endpoint must route regardless of method.
2669        let anon = AuthContext::anonymous();
2670        with_ctx(true, &anon, |ctx| {
2671            for method in [
2672                HttpMethod::Get,
2673                HttpMethod::Post,
2674                HttpMethod::Put,
2675                HttpMethod::Patch,
2676                HttpMethod::Delete,
2677            ] {
2678                let (status, _body, _ct) = route(ctx, method, "/api/webhooks/any_name", "", None);
2679                // Without function runtime it's 503 regardless; the point
2680                // is that we didn't 405 Method Not Allowed.
2681                assert_ne!(status, 405);
2682            }
2683        });
2684    }
2685
2686    // -----------------------------------------------------------------------
2687    // Auth matrix regression scaffold
2688    //
2689    // Goal: catch reviewer-class bugs at PR time. Every route in this
2690    // table is hit as anonymous, guest, and authed-non-admin. The
2691    // EXPECTED column is what the route should return for that
2692    // identity. Adding a new route? Add a row. Discovering a bypass?
2693    // Add the regression here so it stays caught.
2694    //
2695    // Status semantics:
2696    //   401 = AUTH_REQUIRED            (require_auth)
2697    //   403 = FORBIDDEN | POLICY_DENIED (require_admin / policy)
2698    //   any = 200..=599 means "anything but 405" — for routes that
2699    //         legitimately reach the handler with this identity and
2700    //         may 200/400/etc. depending on body.
2701    // -----------------------------------------------------------------------
2702
2703    #[derive(Clone, Copy)]
2704    enum Expect {
2705        /// Status must equal this value.
2706        Eq(u16),
2707        /// Status must be 401 or 403 (rejected before handler logic).
2708        Rejected,
2709        /// Anything except 405 — handler reached, body validation may
2710        /// have triggered other errors.
2711        ReachedHandler,
2712    }
2713
2714    fn assert_expect(actual: u16, want: Expect, label: &str) {
2715        match want {
2716            Expect::Eq(s) => assert_eq!(actual, s, "{label}: expected status {s}, got {actual}"),
2717            Expect::Rejected => assert!(
2718                actual == 401 || actual == 403,
2719                "{label}: expected 401 or 403, got {actual}"
2720            ),
2721            Expect::ReachedHandler => assert_ne!(
2722                actual, 405,
2723                "{label}: route should accept this method, got 405"
2724            ),
2725        }
2726    }
2727
2728    /// Hit a route as anon / guest / authed-non-admin and assert each
2729    /// identity gets the documented response. Catches reviewer-class
2730    /// bugs (e.g. a P1 finding that an endpoint is_admin-gated drifts
2731    /// to public during a refactor).
2732    fn matrix_check(
2733        method: HttpMethod,
2734        url: &str,
2735        body: &str,
2736        expect_anon: Expect,
2737        expect_guest: Expect,
2738        expect_user: Expect,
2739    ) {
2740        let anon = AuthContext::anonymous();
2741        let guest = AuthContext::guest("guest-1".into());
2742        let user = AuthContext::authenticated("u-1".into());
2743
2744        for (auth, want, who) in [
2745            (&anon, expect_anon, "anon"),
2746            (&guest, expect_guest, "guest"),
2747            (&user, expect_user, "user"),
2748        ] {
2749            with_ctx(false, auth, |ctx| {
2750                let (status, _body, _ct) = route(ctx, method, url, body, None);
2751                assert_expect(status, want, &format!("{who} {method:?} {url}"));
2752            });
2753        }
2754    }
2755
2756    /// Like matrix_check, but the test scaffold loads a manifest with
2757    /// a deny-by-default policy on the named entity. Use this for
2758    /// policy-gated routes (cursor, filtered query, CRDT push) where
2759    /// the gate's job is "call check_entity_*" — without a denying
2760    /// policy in scope the call would silently pass.
2761    fn matrix_check_with_deny_policy(
2762        deny_entity: &str,
2763        method: HttpMethod,
2764        url: &str,
2765        body: &str,
2766        expect_anon: Expect,
2767        expect_guest: Expect,
2768        expect_user: Expect,
2769    ) {
2770        use pylon_kernel::{AppManifest, ManifestPolicy, MANIFEST_VERSION};
2771        let anon = AuthContext::anonymous();
2772        let guest = AuthContext::guest("guest-1".into());
2773        let user = AuthContext::authenticated("u-1".into());
2774
2775        let manifest = AppManifest {
2776            manifest_version: MANIFEST_VERSION,
2777            name: "test".into(),
2778            version: "0.1.0".into(),
2779            entities: vec![],
2780            routes: vec![],
2781            queries: vec![],
2782            actions: vec![],
2783            policies: vec![ManifestPolicy {
2784                name: "denyAll".into(),
2785                entity: Some(deny_entity.into()),
2786                allow_read: Some("false".into()),
2787                allow_update: Some("false".into()),
2788                ..Default::default()
2789            }],
2790            auth: Default::default(),
2791        };
2792        let store = StubDataStore {
2793            manifest: manifest.clone(),
2794        };
2795        let session_store = SessionStore::new();
2796        let magic_codes = MagicCodeStore::new();
2797        let oauth_state = OAuthStateStore::new();
2798        let account_store = pylon_auth::AccountStore::new();
2799        let api_keys = pylon_auth::api_key::ApiKeyStore::new();
2800        let orgs = pylon_auth::org::OrgStore::new();
2801        let siwe = pylon_auth::siwe::NonceStore::new();
2802        let phone_codes = pylon_auth::phone::PhoneCodeStore::new();
2803        let passkeys = pylon_auth::webauthn::PasskeyStore::new();
2804        let policy_engine = PolicyEngine::from_manifest(&manifest);
2805        let change_log = ChangeLog::new();
2806        let notifier = NoopNotifier;
2807        let rooms = StubRooms;
2808        let cache = StubCache;
2809        let pubsub = StubPubSub;
2810        let jobs = StubJobs;
2811        let scheduler = StubScheduler;
2812        let workflows = StubWorkflows;
2813        let files = StubFiles;
2814        let openapi = StubOpenApi;
2815        let email = StubEmail;
2816        let cookie_config = CookieConfig::from_env(&CookieConfig::default_name_for("test"));
2817
2818        for (auth, want, who) in [
2819            (&anon, expect_anon, "anon"),
2820            (&guest, expect_guest, "guest"),
2821            (&user, expect_user, "user"),
2822        ] {
2823            let ctx = RouterContext {
2824                store: &store,
2825                session_store: &session_store,
2826                magic_codes: &magic_codes,
2827                oauth_state: &oauth_state,
2828                account_store: &account_store,
2829            api_keys: &api_keys,
2830            orgs: &orgs,
2831            siwe: &siwe,
2832            phone_codes: &phone_codes,
2833            passkeys: &passkeys,
2834                trusted_origins: &[],
2835                policy_engine: &policy_engine,
2836                change_log: &change_log,
2837                notifier: &notifier,
2838                rooms: &rooms,
2839                cache: &cache,
2840                pubsub: &pubsub,
2841                jobs: &jobs,
2842                scheduler: &scheduler,
2843                workflows: &workflows,
2844                files: &files,
2845                openapi: &openapi,
2846                functions: None,
2847                email: &email,
2848                shards: None,
2849                plugin_hooks: &NoopPluginHooks,
2850                auth_ctx: auth,
2851                is_dev: false,
2852                request_headers: &[],
2853                peer_ip: "127.0.0.1",
2854                cookie_config: &cookie_config,
2855                response_headers: RefCell::new(Vec::new()),
2856            };
2857            let (status, _body, _ct) = route(&ctx, method, url, body, None);
2858            assert_expect(status, want, &format!("{who} {method:?} {url}"));
2859        }
2860    }
2861
2862    #[test]
2863    fn matrix_cache_admin_only() {
2864        matrix_check(
2865            HttpMethod::Get,
2866            "/api/cache/anykey",
2867            "",
2868            Expect::Rejected,
2869            Expect::Rejected,
2870            Expect::Rejected,
2871        );
2872        matrix_check(
2873            HttpMethod::Post,
2874            "/api/cache",
2875            r#"{"op":"get","key":"x"}"#,
2876            Expect::Rejected,
2877            Expect::Rejected,
2878            Expect::Rejected,
2879        );
2880        matrix_check(
2881            HttpMethod::Delete,
2882            "/api/cache/anykey",
2883            "",
2884            Expect::Rejected,
2885            Expect::Rejected,
2886            Expect::Rejected,
2887        );
2888    }
2889
2890    #[test]
2891    fn matrix_pubsub_admin_only() {
2892        matrix_check(
2893            HttpMethod::Post,
2894            "/api/pubsub/publish",
2895            r#"{"channel":"x","message":"y"}"#,
2896            Expect::Rejected,
2897            Expect::Rejected,
2898            Expect::Rejected,
2899        );
2900        matrix_check(
2901            HttpMethod::Get,
2902            "/api/pubsub/channels",
2903            "",
2904            Expect::Rejected,
2905            Expect::Rejected,
2906            Expect::Rejected,
2907        );
2908        matrix_check(
2909            HttpMethod::Get,
2910            "/api/pubsub/history/some-channel",
2911            "",
2912            Expect::Rejected,
2913            Expect::Rejected,
2914            Expect::Rejected,
2915        );
2916    }
2917
2918    #[test]
2919    fn matrix_jobs_read_admin_only() {
2920        matrix_check(
2921            HttpMethod::Get,
2922            "/api/jobs/stats",
2923            "",
2924            Expect::Rejected,
2925            Expect::Rejected,
2926            Expect::Rejected,
2927        );
2928        matrix_check(
2929            HttpMethod::Get,
2930            "/api/jobs/dead",
2931            "",
2932            Expect::Rejected,
2933            Expect::Rejected,
2934            Expect::Rejected,
2935        );
2936        matrix_check(
2937            HttpMethod::Get,
2938            "/api/jobs",
2939            "",
2940            Expect::Rejected,
2941            Expect::Rejected,
2942            Expect::Rejected,
2943        );
2944        matrix_check(
2945            HttpMethod::Get,
2946            "/api/jobs/some-job-id",
2947            "",
2948            Expect::Rejected,
2949            Expect::Rejected,
2950            Expect::Rejected,
2951        );
2952    }
2953
2954    #[test]
2955    fn matrix_workflows_read_admin_only() {
2956        matrix_check(
2957            HttpMethod::Get,
2958            "/api/workflows/definitions",
2959            "",
2960            Expect::Rejected,
2961            Expect::Rejected,
2962            Expect::Rejected,
2963        );
2964        matrix_check(
2965            HttpMethod::Get,
2966            "/api/workflows",
2967            "",
2968            Expect::Rejected,
2969            Expect::Rejected,
2970            Expect::Rejected,
2971        );
2972        matrix_check(
2973            HttpMethod::Get,
2974            "/api/workflows/some-id",
2975            "",
2976            Expect::Rejected,
2977            Expect::Rejected,
2978            Expect::Rejected,
2979        );
2980    }
2981
2982    #[test]
2983    fn matrix_files_download_requires_auth() {
2984        // Anon must not enumerate uploads via predictable file IDs.
2985        // Guest + user can — files use require_auth, not require_admin.
2986        matrix_check(
2987            HttpMethod::Get,
2988            "/api/files/some-file-id",
2989            "",
2990            Expect::Eq(401),
2991            Expect::ReachedHandler,
2992            Expect::ReachedHandler,
2993        );
2994    }
2995
2996    #[test]
2997    fn matrix_crdt_push_respects_update_policy() {
2998        // Pre-fix: any session (incl. guest) could push a CRDT update
2999        // to any addressable row. Now: when the entity has an update
3000        // policy that denies, even authed non-admins are blocked. Anon
3001        // bounces at the require_auth gate before policy is consulted.
3002        matrix_check_with_deny_policy(
3003            "Doc",
3004            HttpMethod::Post,
3005            "/api/crdt/Doc/some-row",
3006            r#"{"update":"00"}"#,
3007            Expect::Eq(401),
3008            Expect::Rejected,
3009            Expect::Rejected,
3010        );
3011    }
3012
3013    #[test]
3014    fn matrix_filtered_query_respects_read_policy() {
3015        matrix_check_with_deny_policy(
3016            "Secret",
3017            HttpMethod::Post,
3018            "/api/query/Secret",
3019            r#"{"where":{}}"#,
3020            Expect::Rejected,
3021            Expect::Rejected,
3022            Expect::Rejected,
3023        );
3024    }
3025
3026    #[test]
3027    fn matrix_cursor_pagination_respects_read_policy() {
3028        matrix_check_with_deny_policy(
3029            "Secret",
3030            HttpMethod::Get,
3031            "/api/entities/Secret/cursor?limit=10",
3032            "",
3033            Expect::Rejected,
3034            Expect::Rejected,
3035            Expect::Rejected,
3036        );
3037    }
3038
3039    /// One-shot audit of every admin-required GET route. Every entry
3040    /// here is a route where an anonymous, guest, or authed-non-admin
3041    /// caller MUST receive 401/403. Adding a new admin GET? Add a row.
3042    /// Removing a route's admin gate? You'll see this test fail in the
3043    /// PR diff and can confirm the change is intentional.
3044    ///
3045    /// This is the forcing function the 2nd security review asked for:
3046    /// "every `/api/*` GET that doesn't return admin/non-sensitive
3047    /// data has an auth gate before the handler". Compile-time
3048    /// enumeration would be nicer but the route list lives in
3049    /// `route_inner` as a chain of if-blocks; until that's reified
3050    /// into data, this table is the gate.
3051    #[test]
3052    fn matrix_admin_get_routes_audit() {
3053        let admin_get_routes: &[(&str, &str)] = &[
3054            ("/api/scheduler", "list scheduled tasks"),
3055            ("/api/fn", "enumerate registered functions"),
3056            ("/api/fn/traces", "function execution traces"),
3057            ("/api/shards", "shard topology + subscriber counts"),
3058            ("/api/cache/anykey", "raw cache read"),
3059            ("/api/pubsub/channels", "list pub/sub channels"),
3060            ("/api/pubsub/history/anychannel", "channel retained history"),
3061            ("/api/jobs/stats", "job queue stats"),
3062            ("/api/jobs/dead", "dead-letter queue"),
3063            ("/api/jobs", "job list with payloads"),
3064            ("/api/jobs/some-id", "single job detail"),
3065            ("/api/workflows/definitions", "workflow definitions"),
3066            ("/api/workflows", "workflow instance list"),
3067            ("/api/workflows/some-id", "workflow instance detail"),
3068        ];
3069        for (url, label) in admin_get_routes {
3070            matrix_check(
3071                HttpMethod::Get,
3072                url,
3073                "",
3074                Expect::Rejected,
3075                Expect::Rejected,
3076                Expect::Rejected,
3077            );
3078            // re-assert with explicit fail message so the audit log
3079            // pinpoints which row regressed.
3080            let _ = label;
3081        }
3082    }
3083
3084    // Keep the warning silencer until this is used.
3085    #[allow(dead_code)]
3086    const _TOUCH_ATOMIC_BOOL: AtomicBool = AtomicBool::new(false);
3087
3088    // -----------------------------------------------------------------------
3089    // Round-3 hardening tests
3090    // -----------------------------------------------------------------------
3091
3092    #[test]
3093    fn redact_email_keeps_two_chars_and_domain() {
3094        assert_eq!(super::redact_email("alice@acme.com"), "al***@acme.com");
3095        assert_eq!(super::redact_email("a@b.io"), "a***@b.io");
3096        assert_eq!(super::redact_email("ab@x.io"), "ab***@x.io");
3097        // Pathological inputs don't crash; just return a marker.
3098        assert_eq!(super::redact_email("not-an-email"), "***");
3099        assert_eq!(super::redact_email(""), "***");
3100        // Multi-byte chars in local-part don't slice mid-codepoint.
3101        assert_eq!(super::redact_email("éric@x.io"), "ér***@x.io");
3102    }
3103
3104    #[test]
3105    fn public_manifest_strips_policy_expressions() {
3106        use pylon_kernel::{AppManifest, ManifestPolicy, MANIFEST_VERSION};
3107        let m = AppManifest {
3108            manifest_version: MANIFEST_VERSION,
3109            name: "t".into(),
3110            version: "0.0.0".into(),
3111            entities: vec![],
3112            routes: vec![],
3113            queries: vec![],
3114            actions: vec![],
3115            policies: vec![ManifestPolicy {
3116                name: "ownerOnly".into(),
3117                entity: Some("Todo".into()),
3118                allow_read: Some("auth.userId == data.ownerId".into()),
3119                allow_update: Some("auth.userId == data.ownerId".into()),
3120                ..Default::default()
3121            }],
3122            auth: Default::default(),
3123        };
3124        let pub_m = super::public_manifest(&m);
3125        let p = &pub_m.policies[0];
3126        // Name + entity preserved so client tooling can map "denied
3127        // by ownerOnly" errors to the human label.
3128        assert_eq!(p.name, "ownerOnly");
3129        assert_eq!(p.entity.as_deref(), Some("Todo"));
3130        // Expressions stripped.
3131        assert_eq!(p.allow, "");
3132        assert!(p.allow_read.is_none());
3133        assert!(p.allow_update.is_none());
3134        // The full manifest still has them — sanity check the test
3135        // didn't accidentally mutate the input.
3136        assert_eq!(
3137            m.policies[0].allow_read.as_deref(),
3138            Some("auth.userId == data.ownerId")
3139        );
3140    }
3141}