Skip to main content

pylon_runtime/
server.rs

1#[allow(unused_imports)]
2use std::io::Read;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5use std::time::Instant;
6
7use pylon_auth::SessionStore;
8use pylon_http::HttpMethod;
9use pylon_plugin::PluginRegistry;
10use pylon_policy::PolicyEngine;
11use pylon_sync::{ChangeKind, ChangeLog};
12use tiny_http::{Header, Method, Response, Server};
13
14use crate::datastore::{
15    CacheAdapter, EmailAdapter, LocalFileOps, PluginHooksAdapter, PubSubAdapter,
16    RuntimeOpenApiGenerator, ShardOpsAdapter, WsSseNotifier,
17};
18use crate::jobs::{JobQueue, JobResult, Worker};
19use crate::metrics::Metrics;
20use crate::pubsub::PubSubBroker;
21use crate::rate_limit::RateLimiter;
22use crate::rooms::RoomManager;
23use crate::scheduler::Scheduler;
24use crate::sse::SseHub;
25use crate::workflows::WorkflowEngine;
26use crate::ws::WsHub;
27use crate::Runtime;
28use pylon_plugin::builtin::ai_proxy::{AiMessage, AiProxyPlugin};
29use pylon_plugin::builtin::cache::CachePlugin;
30
31// ---------------------------------------------------------------------------
32// Streaming body — bridges mpsc::Receiver to std::io::Read for SSE responses
33// ---------------------------------------------------------------------------
34
35/// A streaming response body backed by an MPSC channel.
36///
37/// When used as the body of a `tiny_http::Response`, it causes the server to
38/// write data as it arrives through the channel. Dropping the sender closes
39/// the stream (EOF).
40struct StreamingBody {
41    rx: std::sync::mpsc::Receiver<Vec<u8>>,
42    buf: Vec<u8>,
43    pos: usize,
44}
45
46impl StreamingBody {
47    fn new(rx: std::sync::mpsc::Receiver<Vec<u8>>) -> Self {
48        Self {
49            rx,
50            buf: Vec::new(),
51            pos: 0,
52        }
53    }
54}
55
56impl std::io::Read for StreamingBody {
57    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
58        // Drain any leftover data from a previous recv that was larger than
59        // the caller's buffer.
60        if self.pos < self.buf.len() {
61            let remaining = &self.buf[self.pos..];
62            let n = remaining.len().min(buf.len());
63            buf[..n].copy_from_slice(&remaining[..n]);
64            self.pos += n;
65            if self.pos >= self.buf.len() {
66                self.buf.clear();
67                self.pos = 0;
68            }
69            return Ok(n);
70        }
71
72        // Block until the next chunk arrives or the sender is dropped.
73        match self.rx.recv() {
74            Ok(data) if data.is_empty() => Ok(0),
75            Ok(data) => {
76                let n = data.len().min(buf.len());
77                buf[..n].copy_from_slice(&data[..n]);
78                if n < data.len() {
79                    self.buf = data;
80                    self.pos = n;
81                }
82                Ok(n)
83            }
84            Err(_) => Ok(0), // Channel closed = EOF
85        }
86    }
87}
88
89/// Global shutdown flag. Set via `request_shutdown()` to trigger graceful exit.
90static SHUTDOWN: AtomicBool = AtomicBool::new(false);
91
92/// Request a graceful shutdown of the running server.
93///
94/// This sets the shutdown flag and, if a server handle has been stashed, calls
95/// `unblock()` to wake the request loop. Safe to call from any thread or signal
96/// handler.
97pub fn request_shutdown() {
98    SHUTDOWN.store(true, Ordering::SeqCst);
99    // If a server handle is available, unblock the request loop so it can
100    // observe the flag immediately rather than waiting for the next request.
101    if let Some(srv) = SERVER_HANDLE.get() {
102        srv.unblock();
103    }
104}
105
106/// Global handle to the `tiny_http::Server` so `request_shutdown()` can call
107/// `unblock()` without requiring callers to hold a reference.
108static SERVER_HANDLE: std::sync::OnceLock<Arc<Server>> = std::sync::OnceLock::new();
109
110// ---------------------------------------------------------------------------
111// Security headers
112// ---------------------------------------------------------------------------
113
114/// Resolve the real client IP behind `trust_proxy_hops` reverse
115/// proxies. Returns an owned String; empty when no IP can be
116/// determined (callers fall back to "anon" identity downstream).
117///
118/// `trust_proxy_hops == 0` is the safe default: we ignore XFF
119/// entirely and use the socket address. Set to N when N trusted
120/// proxies sit in front of Pylon — we take the Nth-from-the-right
121/// XFF entry, which is the address the closest trusted proxy
122/// observed. Honoring the leftmost (or just trusting the whole
123/// header) lets any caller spoof their source IP by sending an
124/// `X-Forwarded-For: 1.2.3.4` header themselves.
125fn resolve_client_ip(request: &tiny_http::Request, trust_proxy_hops: usize) -> String {
126    let socket_ip = request
127        .remote_addr()
128        .map(|a| a.ip().to_string())
129        .unwrap_or_default();
130    if trust_proxy_hops == 0 {
131        return socket_ip;
132    }
133    // tiny_http stores field names as AsciiStr; cast back to &str so
134    // we can do the case-insensitive compare RFC 7230 calls for.
135    let xff = request
136        .headers()
137        .iter()
138        .find(|h| {
139            h.field
140                .as_str()
141                .as_str()
142                .eq_ignore_ascii_case("X-Forwarded-For")
143        })
144        .map(|h| h.value.as_str().to_string());
145    let Some(xff) = xff else {
146        return socket_ip;
147    };
148    // XFF is "client, proxy1, proxy2" — the leftmost is whatever the
149    // first hop SAID was the client (untrusted), and each subsequent
150    // entry is what the next hop saw. With N trusted proxies, the
151    // Nth-from-right is the IP our closest trusted proxy verified.
152    let entries: Vec<&str> = xff.split(',').map(str::trim).collect();
153    if entries.len() < trust_proxy_hops {
154        // XFF doesn't have enough hops — operator misconfiguration
155        // or a request that bypassed the expected proxy chain.
156        // Fall back to socket IP rather than trusting whatever's
157        // there.
158        return socket_ip;
159    }
160    let candidate = entries[entries.len() - trust_proxy_hops];
161    // Validate it parses as an IP before using as a bucket key —
162    // garbage-in would let attackers poison the rate-limit map.
163    if candidate.parse::<std::net::IpAddr>().is_ok() {
164        candidate.to_string()
165    } else {
166        socket_ip
167    }
168}
169
170/// Common security headers applied to every response.
171///
172/// `Referrer-Policy` and `Permissions-Policy` are defense-in-depth.
173/// `Strict-Transport-Security` is intentionally NOT set here — Pylon
174/// is typically reached through a TLS-terminating proxy (Fly LB,
175/// CloudFront) that owns the HSTS decision; setting it from the
176/// origin would force every plaintext-loopback test deploy to fight
177/// the browser cache.
178fn security_headers() -> Vec<Header> {
179    vec![
180        Header::from_bytes("X-Content-Type-Options", "nosniff").unwrap(),
181        Header::from_bytes("X-Frame-Options", "DENY").unwrap(),
182        Header::from_bytes("X-XSS-Protection", "1; mode=block").unwrap(),
183        // Don't leak the full URL to cross-origin destinations on
184        // navigation; same-origin still gets the path so internal
185        // analytics keep working.
186        Header::from_bytes("Referrer-Policy", "strict-origin-when-cross-origin").unwrap(),
187        // Deny every powerful browser API by default. Apps that need
188        // camera/mic/geolocation override per-route via their own
189        // Permissions-Policy header.
190        Header::from_bytes(
191            "Permissions-Policy",
192            "accelerometer=(), camera=(), geolocation=(), gyroscope=(), microphone=(), payment=(), usb=()",
193        )
194        .unwrap(),
195    ]
196}
197
198/// Add security headers to a response.
199fn with_security_headers<R: std::io::Read>(response: Response<R>) -> Response<R> {
200    let mut resp = response;
201    for header in security_headers() {
202        resp = resp.with_header(header);
203    }
204    resp
205}
206
207/// Start the dev server on the given port. Blocks until shutdown.
208pub fn start(runtime: Arc<Runtime>, port: u16) -> Result<(), String> {
209    start_with_plugins(runtime, port, None)
210}
211
212/// Start the dev server with optional plugins. Blocks until shutdown.
213pub fn start_with_plugins(
214    runtime: Arc<Runtime>,
215    port: u16,
216    plugins: Option<Arc<PluginRegistry>>,
217) -> Result<(), String> {
218    start_server(runtime, port, plugins, None)
219}
220
221/// Start the dev server with plugins and a shard registry for real-time
222/// simulations (games, MMO zones, etc.). Blocks until shutdown.
223pub fn start_with_shards(
224    runtime: Arc<Runtime>,
225    port: u16,
226    plugins: Option<Arc<PluginRegistry>>,
227    shard_registry: Arc<dyn pylon_realtime::DynShardRegistry>,
228) -> Result<(), String> {
229    start_server(runtime, port, plugins, Some(shard_registry))
230}
231
232fn start_server(
233    runtime: Arc<Runtime>,
234    port: u16,
235    plugins: Option<Arc<PluginRegistry>>,
236    shard_registry: Option<Arc<dyn pylon_realtime::DynShardRegistry>>,
237) -> Result<(), String> {
238    // Run the tracing-exporter hook BEFORE anything else emits spans. The
239    // operator registers it via `pylon_observability::set_tracing_hook`
240    // at process init; here we invoke it exactly once on startup. No-op
241    // if nothing was registered.
242    pylon_observability::run_tracing_hook();
243
244    let addr = format!("0.0.0.0:{port}");
245    let server = Server::http(&addr).map_err(|e| format!("Failed to start server: {e}"))?;
246    let server = Arc::new(server);
247
248    // Stash a handle so `request_shutdown()` can unblock the loop.
249    let _ = SERVER_HANDLE.set(Arc::clone(&server));
250
251    let session_lifetime = runtime.manifest().auth.session.expires_in;
252    let auth_stores = build_auth_stores(runtime.db_path().as_deref(), session_lifetime);
253    let session_store = auth_stores.session_store;
254    let magic_codes = auth_stores.magic_codes;
255    let oauth_state = auth_stores.oauth_state;
256    let account_store = auth_stores.account_store;
257    let api_keys = auth_stores.api_keys;
258    let orgs = auth_stores.orgs;
259    let siwe = auth_stores.siwe;
260    let phone_codes = auth_stores.phone_codes;
261    let passkeys = auth_stores.passkeys;
262    let verification = auth_stores.verification;
263    let audit = auth_stores.audit;
264    let policy_engine = Arc::new(PolicyEngine::from_manifest(runtime.manifest()));
265    let change_log = Arc::new(ChangeLog::new());
266
267    // Seed the change log with one synthetic insert per extant row so that
268    // a pull from seq=0 after a restart reconstructs current state. The
269    // change log is in-memory — restarting the process without this would
270    // leave SQLite rows unreachable via /api/sync/pull (clients would
271    // pull nothing and see an empty replica). Seqs here are fresh; clients
272    // whose cursors are ahead of `self.seq` get a 410 and full resync,
273    // which then hits this seeded log and gets every current row back.
274    for entity in runtime.manifest().entities.iter() {
275        match runtime.list(&entity.name) {
276            Ok(rows) => {
277                for row in rows {
278                    if let Some(id) = row.get("id").and_then(|v| v.as_str()) {
279                        change_log.append(&entity.name, id, ChangeKind::Insert, Some(row.clone()));
280                    }
281                }
282            }
283            Err(_) => {
284                // Entity table may not exist yet on first boot — skip.
285            }
286        }
287    }
288    let ws_hub = WsHub::new();
289    let sse_hub = SseHub::new();
290    // Default-register the rate-limit plugin when no custom registry was
291    // supplied. Without this, self-hosted deployments would launch with
292    // auth endpoints (/api/auth/magic/send, /api/auth/magic/verify,
293    // /api/auth/session) wide open to brute force and enumeration.
294    //
295    // Dev: 100k/min so a React app's initial bundle + auth + sync pulls
296    // (each worth ~6-10 requests) doesn't immediately 429 the dev. Prod:
297    // 100/min per IP — tight enough to crush burst attackers, loose
298    // enough for legitimate multi-tab UIs. Callers passing their own
299    // registry are responsible for their own limits.
300    // Probe dev mode NOW — defined for real at line ~300 but plugin
301    // registration below needs it. Same env-var, same logic.
302    let is_dev_early = std::env::var("PYLON_DEV_MODE")
303        .map(|v| v == "1" || v == "true")
304        .unwrap_or(true);
305    let plugin_rl_max: u32 = if is_dev_early { 100_000 } else { 100 };
306    let plugin_reg: Arc<PluginRegistry> = plugins.unwrap_or_else(|| {
307        let mut reg = PluginRegistry::new(runtime.manifest().clone());
308        reg.register(Arc::new(
309            pylon_plugin::builtin::rate_limit::RateLimitPlugin::new(
310                plugin_rl_max,
311                std::time::Duration::from_secs(60),
312            ),
313        ));
314        // Auto-scope any entity that declares a `tenantId` field. This is
315        // how multi-tenant isolation becomes a default posture rather than
316        // an opt-in: drop the field on the entity and the plugin takes it
317        // from there (stamps inserts, rejects cross-tenant writes).
318        reg.register(Arc::new(
319            pylon_plugin::builtin::tenant_scope::TenantScopePlugin::from_manifest(
320                runtime.manifest(),
321            ),
322        ));
323        Arc::new(reg)
324    });
325    let room_mgr = Arc::new(RoomManager::new(120)); // 2 min idle timeout
326    let ws_port = port + 1;
327    let sse_port = port + 2;
328
329    // Record server start time for the health endpoint.
330    let start_time = Instant::now();
331
332    let metrics = Arc::new(Metrics::new());
333
334    // Cache and pub/sub shared instances.
335    let cache = Arc::new(CachePlugin::new(100_000));
336    let pubsub_broker = Arc::new(PubSubBroker::new(100));
337
338    // Job queue, scheduler, and background workers.
339    let job_queue = Arc::new(JobQueue::new(1000));
340
341    // Persistent job store. Colocate with the app DB so `./app.db` gets
342    // `./app.db.jobs.db` automatically — otherwise jobs land in CWD, which
343    // is wherever the server was launched from (confusing and fragile).
344    // In-memory runtimes and the `PYLON_JOBS_IN_MEMORY=1` opt-out both
345    // skip persistence.
346    let jobs_in_memory = std::env::var("PYLON_JOBS_IN_MEMORY")
347        .map(|v| v == "1" || v == "true")
348        .unwrap_or(false);
349    if !jobs_in_memory {
350        let jobs_db_path = std::env::var("PYLON_JOBS_DB").ok().unwrap_or_else(|| {
351            runtime
352                .db_path()
353                .map(|p| format!("{p}.jobs.db"))
354                .unwrap_or_else(|| "pylon.jobs.db".into())
355        });
356        match crate::job_store::JobStore::open(&jobs_db_path) {
357            Ok(store) => {
358                let store = Arc::new(store);
359                let restored = job_queue.restore_from(&store);
360                if restored > 0 {
361                    tracing::info!("[jobs] Restored {restored} pending job(s) from {jobs_db_path}");
362                }
363                job_queue.attach_store(store);
364            }
365            Err(e) => {
366                tracing::warn!(
367                    "[jobs] Could not open job store at {jobs_db_path}: {e} — running without persistence"
368                );
369            }
370        }
371    }
372
373    // Register built-in framework jobs.
374    {
375        let cache_ref = Arc::clone(&cache);
376        job_queue.register(
377            "pylon.cache.cleanup",
378            Arc::new(move |_job| {
379                cache_ref.cleanup_expired();
380                JobResult::Success
381            }),
382        );
383        let rooms_ref = Arc::clone(&room_mgr);
384        job_queue.register(
385            "pylon.rooms.cleanup",
386            Arc::new(move |_job| {
387                rooms_ref.cleanup_idle();
388                JobResult::Success
389            }),
390        );
391    }
392
393    let scheduler = Arc::new(Scheduler::new(Arc::clone(&job_queue)));
394    // Schedule built-in tasks.
395    let _ = scheduler.schedule(
396        "pylon.cache.cleanup",
397        "*/10 * * * *",
398        Arc::new(|_| JobResult::Success),
399    );
400    let _ = scheduler.schedule(
401        "pylon.rooms.cleanup",
402        "*/5 * * * *",
403        Arc::new(|_| JobResult::Success),
404    );
405
406    // Start 2 background workers.
407    let _worker_handles: Vec<_> = (0..2)
408        .map(|i| {
409            let w = Worker::new(Arc::clone(&job_queue), &format!("worker-{i}"));
410            w.start()
411        })
412        .collect();
413
414    // Start the scheduler.
415    let _scheduler_handle = Arc::clone(&scheduler).start();
416
417    // Workflow engine: TS runner URL configurable via env, defaults to local Bun server.
418    let wf_runner_url = std::env::var("PYLON_WORKFLOW_RUNNER_URL")
419        .unwrap_or_else(|_| "http://127.0.0.1:9876/run".to_string());
420    let workflow_engine = Arc::new(WorkflowEngine::new(&wf_runner_url, 10_000));
421
422    // Rate limiter: per-IP outer cap on total requests.
423    //
424    // Defaults:
425    //   - Dev mode: effectively off (100k/min) so a React app's initial
426    //     bundle load + sync pulls + user clicks don't immediately 429.
427    //     100/min blew through during a single login + first sync pull.
428    //   - Prod: 600/min (10 req/sec average). Still tight, but a real app
429    //     should override with PYLON_RATE_LIMIT_MAX anyway.
430    //
431    // Override with PYLON_RATE_LIMIT_MAX + PYLON_RATE_LIMIT_WINDOW.
432    let default_rl_max = if is_dev_early { 100_000 } else { 600 };
433    let rl_max: u32 = std::env::var("PYLON_RATE_LIMIT_MAX")
434        .ok()
435        .and_then(|v| v.parse().ok())
436        .unwrap_or(default_rl_max);
437    let rl_window: u64 = std::env::var("PYLON_RATE_LIMIT_WINDOW")
438        .ok()
439        .and_then(|v| v.parse().ok())
440        .unwrap_or(60);
441    let rate_limiter = Arc::new(RateLimiter::new(rl_max, rl_window));
442
443    // Per-function rate limiter: separate bucket per (caller, function) pair.
444    // Defaults to a stricter cap because functions are heavier than reads.
445    // Override via PYLON_FN_RATE_LIMIT_MAX / PYLON_FN_RATE_LIMIT_WINDOW.
446    let fn_rl_max: u32 = std::env::var("PYLON_FN_RATE_LIMIT_MAX")
447        .ok()
448        .and_then(|v| v.parse().ok())
449        .unwrap_or(30);
450    let fn_rl_window: u64 = std::env::var("PYLON_FN_RATE_LIMIT_WINDOW")
451        .ok()
452        .and_then(|v| v.parse().ok())
453        .unwrap_or(60);
454    let fn_rate_limiter = Arc::new(RateLimiter::new(fn_rl_max, fn_rl_window));
455
456    // TypeScript function runtime: optional Bun process that loads functions/*.ts
457    // If no `functions/` directory exists or Bun isn't installed, this is None
458    // and /api/fn/* routes return 503.
459    // Build a notifier adapter so function mutations (`ctx.db.insert/update/delete`)
460    // emit change events to WS + SSE subscribers on COMMIT. Without this,
461    // functions write to the DB but sync clients never see the update
462    // live — they only catch up on the next refetch.
463    let fn_notifier: Arc<dyn pylon_router::ChangeNotifier> =
464        Arc::new(crate::datastore::WsSseNotifier {
465            ws: Arc::clone(&ws_hub),
466            sse: Arc::clone(&sse_hub),
467        });
468    let fn_ops_maybe = crate::datastore::try_spawn_functions(
469        Arc::clone(&runtime),
470        Arc::clone(&job_queue),
471        Arc::clone(&fn_rate_limiter),
472        Arc::clone(&change_log),
473        fn_notifier,
474    );
475
476    // Dev mode flag. Gates a *lot* of permissive behavior: magic codes
477    // appear in JSON responses, /studio is open without admin auth,
478    // POST /api/auth/session can mint sessions for arbitrary user_ids,
479    // OAuth callback accepts a caller-supplied email, CORS defaults to
480    // `*`, etc. Defaulting to `true` meant a prod deploy that simply
481    // forgot the env var was trivially compromisable — flip to safe-
482    // by-default and let the CLI's `pylon dev` opt in explicitly.
483    let is_dev = std::env::var("PYLON_DEV_MODE")
484        .map(|v| v == "1" || v == "true")
485        .unwrap_or(false);
486
487    // CORS origin. Defaults to `*` in dev for convenience; in prod we refuse
488    // to start with a wildcard because the server sends `Access-Control-
489    // Allow-Credentials: true` elsewhere and also accepts `Authorization:
490    // Bearer <session>`. The combination of `*` + credentials is a spec
491    // violation that some browsers tolerate, and even when they don't it
492    // lets any origin drive bearer-auth APIs.
493    let cors_origin = match std::env::var("PYLON_CORS_ORIGIN") {
494        Ok(v) => v,
495        Err(_) if is_dev => "*".to_string(),
496        Err(_) => {
497            return Err(
498                "PYLON_CORS_ORIGIN must be set in production (non-dev mode). \
499                Set it to your frontend's origin, or set PYLON_DEV_MODE=true \
500                for local development."
501                    .into(),
502            );
503        }
504    };
505    if !is_dev && cors_origin == "*" {
506        return Err("PYLON_CORS_ORIGIN=\"*\" is refused in production mode. \
507            Set it to an explicit origin (https://app.example.com)."
508            .into());
509    }
510    // Browsers forbid combining `Access-Control-Allow-Origin: *` with
511    // `Access-Control-Allow-Credentials: true`. Cookie-based auth needs
512    // credentials, so we only emit the credentials header when the origin
513    // is specific. In dev with `*` we lose cookies-from-cross-origin
514    // (acceptable: dev typically uses same-origin proxying), but we
515    // refuse to send a header combo browsers will reject either way.
516    let allow_credentials = cors_origin != "*";
517    // Validate the origin once so per-request header construction can never
518    // panic on bad bytes. Previously every `Header::from_bytes(...).unwrap()`
519    // was a potential request-triggered DoS via env misconfiguration.
520    if Header::from_bytes(
521        "Access-Control-Allow-Origin",
522        cors_origin.as_bytes().to_vec(),
523    )
524    .is_err()
525    {
526        return Err(format!(
527            "PYLON_CORS_ORIGIN={cors_origin:?} contains bytes that are not a valid HTTP header value"
528        ));
529    }
530
531    // Admin token: read once at startup, not per-request.
532    let admin_token: Option<String> = std::env::var("PYLON_ADMIN_TOKEN").ok();
533
534    // Trusted proxy hops for resolving the real client IP behind a
535    // reverse proxy (Fly LB, nginx, CloudFront, etc.). Default 0 =
536    // ignore X-Forwarded-For and use the socket peer (safe-by-default;
537    // an unconfigured prod deploy can't be tricked into trusting
538    // attacker-supplied XFF). Set to N when there are exactly N
539    // trusted proxies in front of Pylon — the resolver takes the
540    // Nth-from-the-right address in XFF, which is the IP the closest
541    // trusted proxy actually saw the request from. Without this, every
542    // unauth caller behind the proxy shares one rate-limit bucket.
543    let trust_proxy_hops: usize = std::env::var("PYLON_TRUST_PROXY_HOPS")
544        .ok()
545        .and_then(|v| v.parse().ok())
546        .unwrap_or(0);
547
548    // Session cookie config — built once. Cookie name defaults to
549    // `${app_name}_session` so multiple Pylon apps on the same parent
550    // domain don't clobber each other's cookies. Browsers receive an
551    // HttpOnly+Secure+SameSite=Lax cookie by default; the same opaque
552    // session token continues to work via `Authorization: Bearer …`
553    // for CLI / mobile / server-to-server callers.
554    let cookie_config = Arc::new({
555        let app_name = runtime.manifest().name.as_str();
556        pylon_auth::CookieConfig::from_env(&pylon_auth::CookieConfig::default_name_for(app_name))
557    });
558
559    // CSRF protection. Enforced inline at the HTTP layer because the plugin
560    // trait's `on_request` hook doesn't see request headers. For
561    // state-changing methods (POST/PATCH/PUT/DELETE) we check Origin, then
562    // Referer, against the allowlist.
563    //
564    // Allowlist resolution:
565    //   - PYLON_CSRF_ORIGINS (comma-separated) if set
566    //   - otherwise PYLON_CORS_ORIGIN (already validated above)
567    //   - in dev, fall back to allow-any to avoid breaking local tooling.
568    let csrf_origins: Vec<String> = match std::env::var("PYLON_CSRF_ORIGINS") {
569        Ok(v) => v
570            .split(',')
571            .map(|s| s.trim().to_string())
572            .filter(|s| !s.is_empty())
573            .collect(),
574        Err(_) => {
575            if is_dev {
576                vec!["*".to_string()]
577            } else if cors_origin != "*" {
578                vec![cors_origin.clone()]
579            } else {
580                // Non-dev + wildcard was already rejected, but guard anyway.
581                vec![]
582            }
583        }
584    };
585    let csrf = Arc::new(pylon_plugin::builtin::csrf::CsrfPlugin::new(csrf_origins));
586
587    // Trusted origins for OAuth `?callback=` / `?error_callback=`
588    // redirect URLs. Required if any OAuth provider is configured —
589    // an unconfigured list with a configured provider means every
590    // sign-in attempt 403s with UNTRUSTED_REDIRECT, which is
591    // operator-visible and recoverable. We don't auto-derive from
592    // PYLON_CORS_ORIGIN: the CORS origin is the API caller's origin,
593    // which may differ from the dashboard's (e.g. dashboard at
594    // /dashboard, API at api.example.com). Better-auth's `trustedOrigins`
595    // is the model here — explicit allowlist, no implicit trust.
596    // Manifest-declared trusted origins (from auth({trustedOrigins: [...]})
597    // in app.ts) get merged with the env list. Manifest is the
598    // type-safe declarative source; env is the operator override for
599    // ops-only deploys.
600    let manifest_trusted: Vec<String> = runtime.manifest().auth.trusted_origins.clone();
601    let trusted_origins: Vec<String> = std::env::var("PYLON_TRUSTED_ORIGINS")
602        .map(|v| {
603            v.split(',')
604                .map(|s| s.trim().to_string())
605                .filter(|s| !s.is_empty())
606                .collect()
607        })
608        .unwrap_or_else(|_| {
609            // Dev-mode default: trust localhost on the conventional
610            // ports so `pylon dev` + `next dev` works without env
611            // surgery. Production (PYLON_DEV_MODE=false or unset) gets
612            // an empty list, which fails-closed at the OAuth start
613            // endpoint with a clear error pointing the operator at
614            // PYLON_TRUSTED_ORIGINS.
615            if is_dev_early {
616                vec![
617                    "http://localhost:3000".to_string(),
618                    "http://localhost:4321".to_string(),
619                    "http://localhost:5173".to_string(),
620                    "http://127.0.0.1:3000".to_string(),
621                ]
622            } else {
623                Vec::new()
624            }
625        });
626    // Combine env + manifest, dedup, drop empties.
627    let mut combined: Vec<String> = trusted_origins;
628    for m in manifest_trusted {
629        if !m.is_empty() && !combined.contains(&m) {
630            combined.push(m);
631        }
632    }
633    let trusted_origins = Arc::new(combined);
634
635    // Start WebSocket server on port+1.
636    //
637    // The snapshot fetcher gives the WS reader a way to ship the current
638    // CRDT snapshot to a client the instant it subscribes — without it
639    // the new tab would have to wait for the next write before catching
640    // up to the converged state. Encodes into the same length-prefixed
641    // wire frame as the broadcast path so the client decoder is shared.
642    //
643    // Authz is enforced HERE, not at the WS layer: the closure runs the
644    // row through `check_entity_read` against the caller's auth ctx and
645    // returns None on deny. The caller (handle_crdt_control) treats
646    // None as "don't subscribe" — so a denied client can't sit on a
647    // subscription waiting for a future write to leak state.
648    {
649        let hub = Arc::clone(&ws_hub);
650        let sessions = Arc::clone(&session_store);
651        let runtime_for_fetcher = Arc::clone(&runtime);
652        let pe_for_fetcher = Arc::clone(&policy_engine);
653        let fetcher: crate::ws::SnapshotFetcher = Arc::new(move |auth_ctx, entity, row_id| {
654            use pylon_http::DataStore;
655            // Fetch the row first so the policy engine can evaluate
656            // row-level predicates (`data.authorId == auth.userId`
657            // etc). Missing row → deny silently; the client just
658            // never gets a frame and can't probe existence.
659            let row = match runtime_for_fetcher.get_by_id(entity, row_id) {
660                Ok(Some(v)) => v,
661                _ => return None,
662            };
663            if !matches!(
664                pe_for_fetcher.check_entity_read(entity, auth_ctx, Some(&row)),
665                pylon_policy::PolicyResult::Allowed
666            ) {
667                return None;
668            }
669            let snap = match runtime_for_fetcher.crdt_snapshot(entity, row_id) {
670                Ok(Some(bytes)) => bytes,
671                _ => return None,
672            };
673            pylon_router::encode_crdt_frame(
674                pylon_router::CRDT_FRAME_SNAPSHOT,
675                entity,
676                row_id,
677                &snap,
678            )
679            .ok()
680        });
681        std::thread::spawn(move || {
682            crate::ws::start_ws_server(hub, sessions, ws_port, Some(fetcher));
683        });
684    }
685
686    // Start SSE server on port+2.
687    {
688        let hub = Arc::clone(&sse_hub);
689        std::thread::spawn(move || {
690            crate::sse::start_sse_server(hub, sse_port);
691        });
692    }
693
694    // Start shard WebSocket server on port+3 when a registry is provided.
695    let shard_ws_port = port + 3;
696    if let Some(reg) = shard_registry.clone() {
697        let sessions = Arc::clone(&session_store);
698        std::thread::spawn(move || {
699            crate::shard_ws::start_shard_ws_server(reg, sessions, shard_ws_port);
700        });
701    }
702
703    tracing::warn!("pylon dev server listening on http://localhost:{port}");
704    tracing::info!("  WebSocket: ws://localhost:{ws_port}");
705    tracing::info!("  Studio: http://localhost:{port}/studio");
706    tracing::info!("  API:    http://localhost:{port}/api/entities/<entity>");
707    tracing::info!("  Auth:   http://localhost:{port}/api/auth/session");
708
709    // Use recv() in a loop instead of incoming_requests() so we can share
710    // the Arc<Server> with the shutdown path (incoming_requests borrows &self
711    // which prevents moving the Arc into another thread).
712    loop {
713        if SHUTDOWN.load(Ordering::Relaxed) {
714            break;
715        }
716
717        let mut request = match server.recv() {
718            Ok(rq) => rq,
719            Err(_) => {
720                // recv() returns Err when unblocked or the socket is closed.
721                break;
722            }
723        };
724
725        if SHUTDOWN.load(Ordering::Relaxed) {
726            break;
727        }
728
729        let rt = Arc::clone(&runtime);
730        let ss = Arc::clone(&session_store);
731        let pe = Arc::clone(&policy_engine);
732        let cl = Arc::clone(&change_log);
733        let wh = Arc::clone(&ws_hub);
734        let sh = Arc::clone(&sse_hub);
735        let mc = Arc::clone(&magic_codes);
736        let pr = Arc::clone(&plugin_reg);
737        let rm = Arc::clone(&room_mgr);
738        let mt = Arc::clone(&metrics);
739        let os = Arc::clone(&oauth_state);
740        let acc = Arc::clone(&account_store);
741        let ak = Arc::clone(&api_keys);
742        let og = Arc::clone(&orgs);
743        let sw = Arc::clone(&siwe);
744        let pcd = Arc::clone(&phone_codes);
745        let pks = Arc::clone(&passkeys);
746        let vrf = Arc::clone(&verification);
747        let aud = Arc::clone(&audit);
748        let trusted_origins_ref = Arc::clone(&trusted_origins);
749        let ca = Arc::clone(&cache);
750        let ps = Arc::clone(&pubsub_broker);
751        let jq = Arc::clone(&job_queue);
752        let sc = Arc::clone(&scheduler);
753        let we = Arc::clone(&workflow_engine);
754        let fn_ops_ref = fn_ops_maybe.clone();
755        let shards_ref = shard_registry.clone();
756        let cors_origin = cors_origin.clone();
757        let cookie_config = Arc::clone(&cookie_config);
758        let allow_credentials = allow_credentials;
759        let is_dev = is_dev;
760
761        let method = request.method().clone();
762        let url = request.url().to_string();
763
764        // Per-request access log — visibility into what's hitting the
765        // server, mirroring Next.js's `GET /login 200 in 27ms` style.
766        // Suppress for noisy paths (/health, /metrics) so dev-mode logs
767        // don't drown in proxy/scrape traffic. Status + duration get
768        // logged separately by `metrics.record_request` so we don't
769        // need to thread them through every response branch.
770        //
771        // Per-request peer IP keeps it useful for debugging
772        // multi-origin setups (CSRF rejections, rate-limit hits) —
773        // resolve_client_ip already honors PYLON_TRUST_PROXY_HOPS so
774        // this matches what the rest of the server sees.
775        let request_peer_ip = resolve_client_ip(&request, trust_proxy_hops);
776        let request_started_at = std::time::Instant::now();
777        if url != "/health" && url != "/metrics" {
778            tracing::info!("→ {} {} from {}", method.as_str(), url, request_peer_ip);
779            // Stash for the response log (`record_request` reads this
780            // thread-local to emit method/url/status/duration in one
781            // line, like Next.js's `GET /login 200 in 27ms`).
782            crate::metrics::set_current_request(&url, request_started_at);
783        }
784
785        // --- Health check: fast path before auth or body parsing ---
786        if url == "/health" && method == Method::Get {
787            let uptime = start_time.elapsed().as_secs();
788            let body = serde_json::json!({
789                "status": "ok",
790                "version": "0.1.0",
791                "uptime_secs": uptime,
792            })
793            .to_string();
794
795            let response = with_security_headers(
796                Response::from_string(&body)
797                    .with_status_code(200u16)
798                    .with_header(Header::from_bytes("Content-Type", "application/json").unwrap())
799                    .with_header(
800                        Header::from_bytes(
801                            "Access-Control-Allow-Origin",
802                            cors_origin.as_bytes().to_vec(),
803                        )
804                        .unwrap(),
805                    ),
806            );
807            let _ = request.respond(response);
808            continue;
809        }
810
811        // --- Metrics endpoint: fast path before rate-limit / body parsing.
812        // Gate behind admin auth in non-dev to prevent leakage of function
813        // names, request volumes, and error rates to the public internet.
814        // Dev mode stays open so local Prometheus scrapers just work.
815        if url == "/metrics" && method == Method::Get {
816            if !is_dev {
817                let admin_bytes = admin_token.as_deref().unwrap_or("").as_bytes();
818                let auth_ok = !admin_bytes.is_empty()
819                    && request.headers().iter().any(|h| {
820                        let name = h.field.as_str().as_str();
821                        name.eq_ignore_ascii_case("Authorization")
822                            && h.value
823                                .as_str()
824                                .strip_prefix("Bearer ")
825                                .map(|t| pylon_auth::constant_time_eq(t.as_bytes(), admin_bytes))
826                                .unwrap_or(false)
827                    });
828                if !auth_ok {
829                    let body = json_error(
830                        "UNAUTHORIZED",
831                        "/metrics requires admin bearer token in non-dev mode",
832                    );
833                    let response = with_security_headers(
834                        Response::from_string(&body)
835                            .with_status_code(401u16)
836                            .with_header(
837                                Header::from_bytes("Content-Type", "application/json").unwrap(),
838                            ),
839                    );
840                    let _ = request.respond(response);
841                    continue;
842                }
843            }
844            let prefers_prometheus = request.headers().iter().any(|h| {
845                (h.field.as_str() == "Accept" || h.field.as_str() == "accept")
846                    && (h.value.as_str().contains("text/plain")
847                        || h.value.as_str().contains("application/openmetrics-text"))
848            });
849            let (body, content_type) = if prefers_prometheus {
850                (mt.prometheus(), "text/plain; version=0.0.4")
851            } else {
852                (mt.snapshot().to_string(), "application/json")
853            };
854            let response = with_security_headers(
855                Response::from_string(&body)
856                    .with_status_code(200u16)
857                    .with_header(Header::from_bytes("Content-Type", content_type).unwrap())
858                    .with_header(
859                        Header::from_bytes(
860                            "Access-Control-Allow-Origin",
861                            cors_origin.as_bytes().to_vec(),
862                        )
863                        .unwrap(),
864                    ),
865            );
866            let _ = request.respond(response);
867            mt.record_request("GET", 200);
868            continue;
869        }
870
871        // --- Rate limiting: check per-IP request count ---
872        // peer_ip honors PYLON_TRUST_PROXY_HOPS so a deploy behind a
873        // load balancer (Fly, nginx, CloudFront) gets per-client
874        // limiting instead of putting every request through one
875        // bucket keyed by the proxy's IP.
876        let peer_ip = resolve_client_ip(&request, trust_proxy_hops);
877
878        // OPTIONS preflights are browser infrastructure, not user intent.
879        // Rate-limiting them makes a normal page effectively halve its
880        // budget (preflight + real request per call) and returns a 429
881        // that the browser can't interpret as a valid CORS response —
882        // the user-visible symptom is "Failed to fetch" on login. Skip.
883        let is_preflight = matches!(method, Method::Options);
884        if !is_preflight {
885            if let Err(retry_after) = rate_limiter.check(&peer_ip) {
886                let err_body = json_error(
887                    "RATE_LIMITED",
888                    &format!("Too many requests. Retry after {retry_after} seconds."),
889                );
890                let response = with_security_headers(
891                    Response::from_string(&err_body)
892                        .with_status_code(429u16)
893                        .with_header(
894                            Header::from_bytes("Content-Type", "application/json").unwrap(),
895                        )
896                        .with_header(
897                            Header::from_bytes(
898                                "Access-Control-Allow-Origin",
899                                cors_origin.as_bytes().to_vec(),
900                            )
901                            .unwrap(),
902                        )
903                        .with_header(
904                            Header::from_bytes(
905                                "Access-Control-Allow-Methods",
906                                "GET, POST, PATCH, DELETE, OPTIONS",
907                            )
908                            .unwrap(),
909                        )
910                        .with_header(
911                            Header::from_bytes(
912                                "Access-Control-Allow-Headers",
913                                "Content-Type, Authorization",
914                            )
915                            .unwrap(),
916                        )
917                        .with_header(
918                            Header::from_bytes(
919                                "Retry-After",
920                                retry_after.to_string().as_bytes().to_vec(),
921                            )
922                            .unwrap(),
923                        ),
924                );
925                let _ = request.respond(response);
926                mt.record_request(method.as_str(), 429);
927                continue;
928            }
929        } // end: if !is_preflight
930
931        // --- CSRF check on state-changing requests ---
932        //
933        // Browsers forbid cross-origin POST/PATCH/PUT/DELETE unless CORS
934        // allows it, but an attacker controlling another origin can still
935        // ship credentials-bearing requests if the server is permissive.
936        // The CSRF plugin validates Origin (then Referer) against an explicit
937        // allowlist — this is the check that was missing because the Plugin
938        // trait's `on_request` hook has no access to headers.
939        //
940        // The Authorization header carries bearer tokens, so CSRF mostly
941        // matters for cookie-based sessions — but we enforce globally: a
942        // request that misses Origin/Referer on a state-changing method is
943        // rejected, which is the safer default.
944        {
945            let method_str = method.as_str();
946            let is_bearer = request.headers().iter().any(|h| {
947                (h.field.as_str() == "Authorization" || h.field.as_str() == "authorization")
948                    && h.value.as_str().starts_with("Bearer ")
949            });
950            // Bearer-authenticated requests are not CSRF-vulnerable in the
951            // classic sense — browsers don't auto-attach bearer tokens. Skip
952            // the check for them so server-to-server API callers keep working
953            // without needing Origin headers.
954            if !is_bearer && !matches!(method, Method::Get | Method::Head | Method::Options) {
955                let origin = request
956                    .headers()
957                    .iter()
958                    .find(|h| h.field.as_str() == "Origin" || h.field.as_str() == "origin")
959                    .map(|h| h.value.as_str().to_string());
960                let referer = request
961                    .headers()
962                    .iter()
963                    .find(|h| h.field.as_str() == "Referer" || h.field.as_str() == "referer")
964                    .map(|h| h.value.as_str().to_string());
965                if let Err(err) = csrf.check(method_str, origin.as_deref(), referer.as_deref()) {
966                    let body = json_error(&err.code, &err.message);
967                    let response = with_security_headers(
968                        Response::from_string(&body)
969                            .with_status_code(err.status)
970                            .with_header(
971                                Header::from_bytes("Content-Type", "application/json").unwrap(),
972                            )
973                            .with_header(
974                                Header::from_bytes(
975                                    "Access-Control-Allow-Origin",
976                                    cors_origin.as_bytes().to_vec(),
977                                )
978                                .unwrap(),
979                            ),
980                    );
981                    let _ = request.respond(response);
982                    mt.record_request(method_str, err.status);
983                    continue;
984                }
985            }
986        }
987
988        // Extract auth token + auth context EARLY so every fast path (upload,
989        // shard SSE, fn streaming, AI streaming) can enforce auth the same
990        // way the router does. Previously these paths ran before auth
991        // extraction and bypassed the plugin/router auth chain entirely.
992        //
993        // Two transports for the same opaque session token:
994        //   1. `Authorization: Bearer <token>` — CLI, mobile, server-to-server
995        //   2. `Cookie: <name>=<token>` — browsers (HttpOnly, XSS can't read)
996        // Bearer wins when both are present (explicit beats ambient).
997        let bearer_token: Option<String> = request
998            .headers()
999            .iter()
1000            .find(|h| h.field.as_str() == "Authorization" || h.field.as_str() == "authorization")
1001            .and_then(|h| {
1002                let val = h.value.as_str();
1003                val.strip_prefix("Bearer ").map(|t| t.to_string())
1004            });
1005        let cookie_token: Option<String> = if bearer_token.is_some() {
1006            None
1007        } else {
1008            request
1009                .headers()
1010                .iter()
1011                .find(|h| h.field.as_str() == "Cookie" || h.field.as_str() == "cookie")
1012                .and_then(|h| {
1013                    pylon_auth::extract_session_cookie(h.value.as_str(), &cookie_config.name)
1014                })
1015        };
1016        let auth_token: Option<String> = bearer_token.or(cookie_token);
1017        // Token dispatcher (in priority order):
1018        //   1. Admin token → AuthContext::admin
1019        //   2. `pk.…` API key → AuthContext::from_api_key (401 on bad)
1020        //   3. Looks-like-JWT + PYLON_JWT_SECRET set → JWT verify
1021        //   4. Otherwise → session store lookup
1022        // pk. check happens BEFORE looks_like_jwt because an api-key
1023        // token also has 3 dot-separated segments and would otherwise
1024        // be misrouted.
1025        let auth_ctx_result: Result<pylon_auth::AuthContext, &'static str> = if admin_token
1026            .is_some()
1027            && auth_token.is_some()
1028            && pylon_auth::constant_time_eq(
1029                auth_token.as_deref().unwrap_or("").as_bytes(),
1030                admin_token.as_deref().unwrap_or("").as_bytes(),
1031            ) {
1032            Ok(pylon_auth::AuthContext::admin())
1033        } else if let Some(t) = auth_token.as_deref() {
1034            if t.starts_with("pk.") {
1035                match ak.verify(t) {
1036                    Ok(key) => Ok(pylon_auth::AuthContext::from_api_key(
1037                        key.user_id,
1038                        key.id,
1039                        key.scopes,
1040                    )),
1041                    Err(_) => Err("INVALID_API_KEY"),
1042                }
1043            } else if pylon_auth::jwt::looks_like_jwt(t) && jwt_secret().is_some() {
1044                // P0-6 (codex Wave-5 review): require PYLON_JWT_ISSUER
1045                // when JWT auth is enabled. Without it, tokens minted
1046                // with the same HS256 secret for ANY issuer would
1047                // verify, letting a JWT minted for "external-system"
1048                // log in as that system's `sub`. Refuse on misconfig.
1049                let Some(issuer) = jwt_issuer() else {
1050                    tracing::warn!(
1051                        "[auth] PYLON_JWT_SECRET set but PYLON_JWT_ISSUER missing — \
1052                         refusing JWT verify (set both to enable JWT sessions)"
1053                    );
1054                    Err("JWT_MISCONFIGURED")?;
1055                    unreachable!();
1056                };
1057                let secret = jwt_secret().expect("checked above");
1058                match pylon_auth::jwt::verify(t, secret.as_bytes(), Some(issuer)) {
1059                    Ok(claims) => {
1060                        let mut ctx = pylon_auth::AuthContext::authenticated(claims.sub);
1061                        ctx.roles = claims.roles;
1062                        if let Some(t) = claims.tenant_id {
1063                            ctx = ctx.with_tenant(t);
1064                        }
1065                        Ok(ctx)
1066                    }
1067                    Err(_) => Err("INVALID_JWT"),
1068                }
1069            } else {
1070                Ok(ss.resolve(Some(t)))
1071            }
1072        } else {
1073            Ok(ss.resolve(None))
1074        };
1075        let auth_ctx = match auth_ctx_result {
1076            Ok(c) => c,
1077            Err(reason) => {
1078                let body = format!(
1079                    r#"{{"error":{{"code":"{reason}","message":"Bearer token is malformed, expired, or revoked"}}}}"#
1080                );
1081                let resp = tiny_http::Response::from_string(body)
1082                    .with_status_code(401)
1083                    .with_header(
1084                        "Content-Type: application/json"
1085                            .parse::<tiny_http::Header>()
1086                            .unwrap(),
1087                    );
1088                let _ = request.respond(resp);
1089                continue;
1090            }
1091        };
1092
1093        // --- Test-reset endpoint — in-memory + dev mode + localhost only ---
1094        //
1095        // `pylon test` sets PYLON_IN_MEMORY=1 + PYLON_DEV_MODE=true.
1096        // The TS helper `resetDb()` posts here between `test(...)` blocks
1097        // to isolate cases. Gates:
1098        //   1. dev mode (production refuses outright)
1099        //   2. in-memory DB (belt-and-braces against accidental file wipes)
1100        //   3. peer IP is loopback (a dev laptop often has localhost:4321
1101        //      reachable; without this, a browser visiting a malicious
1102        //      site could cross-site-POST a reset via a bare form —
1103        //      blind CSRF that doesn't care about the response)
1104        //
1105        // Positioned AFTER the rate limiter and CSRF check on purpose so
1106        // those middlewares apply — the earlier placement skipped both.
1107        if url == "/api/__test__/reset" && method == Method::Post {
1108            let is_loopback = peer_ip == "127.0.0.1"
1109                || peer_ip == "::1"
1110                || peer_ip.starts_with("127.")
1111                || peer_ip == "localhost";
1112            if !is_dev || !rt.is_in_memory() || !is_loopback {
1113                let body = json_error(
1114                    "RESET_REFUSED",
1115                    "reset endpoint is only available in dev mode + in-memory DB + from loopback",
1116                );
1117                let response = with_security_headers(
1118                    Response::from_string(&body)
1119                        .with_status_code(403u16)
1120                        .with_header(
1121                            Header::from_bytes("Content-Type", "application/json").unwrap(),
1122                        )
1123                        .with_header(
1124                            Header::from_bytes(
1125                                "Access-Control-Allow-Origin",
1126                                cors_origin.as_bytes().to_vec(),
1127                            )
1128                            .unwrap(),
1129                        ),
1130                );
1131                let _ = request.respond(response);
1132                mt.record_request("POST", 403);
1133                continue;
1134            }
1135            let (status, body) = match rt.reset_for_tests() {
1136                Ok(()) => (200u16, "{\"reset\":true}".to_string()),
1137                Err(e) => (500u16, json_error(&e.code, &e.message)),
1138            };
1139            let response = with_security_headers(
1140                Response::from_string(&body)
1141                    .with_status_code(status)
1142                    .with_header(Header::from_bytes("Content-Type", "application/json").unwrap())
1143                    .with_header(
1144                        Header::from_bytes(
1145                            "Access-Control-Allow-Origin",
1146                            cors_origin.as_bytes().to_vec(),
1147                        )
1148                        .unwrap(),
1149                    ),
1150            );
1151            let _ = request.respond(response);
1152            mt.record_request("POST", status);
1153            continue;
1154        }
1155
1156        // --- File upload fast path: handle binary body before string conversion ---
1157        // Uploads come in two shapes:
1158        //   1. Direct binary body with X-Filename / Content-Type headers
1159        //   2. multipart/form-data with a file part
1160        //
1161        // Require an authenticated user. Uploads write to the files backend
1162        // (and into the plugin audit log for soft-delete etc.), so
1163        // unauthenticated callers cannot use this route.
1164        if url == "/api/files/upload" && method == Method::Post {
1165            const UPLOAD_MAX: usize = 10 * 1024 * 1024;
1166            // Enforce size BEFORE reading the body so a 10 GiB stream can't
1167            // buffer into memory. Content-Length pre-check, then bounded read.
1168            if let Some(declared) = request.body_length() {
1169                if declared > UPLOAD_MAX {
1170                    let err = json_error(
1171                        "PAYLOAD_TOO_LARGE",
1172                        &format!("Content-Length {declared} exceeds upload max of {UPLOAD_MAX}"),
1173                    );
1174                    let response = with_security_headers(
1175                        Response::from_string(&err)
1176                            .with_status_code(413u16)
1177                            .with_header(
1178                                Header::from_bytes("Content-Type", "application/json").unwrap(),
1179                            )
1180                            .with_header(
1181                                Header::from_bytes(
1182                                    "Access-Control-Allow-Origin",
1183                                    cors_origin.as_bytes().to_vec(),
1184                                )
1185                                .unwrap(),
1186                            ),
1187                    );
1188                    let _ = request.respond(response);
1189                    mt.record_request("POST", 413);
1190                    continue;
1191                }
1192            }
1193            if auth_ctx.user_id.is_none() {
1194                let err = json_error(
1195                    "AUTH_REQUIRED",
1196                    "/api/files/upload requires an authenticated session",
1197                );
1198                let response = with_security_headers(
1199                    Response::from_string(&err)
1200                        .with_status_code(401u16)
1201                        .with_header(
1202                            Header::from_bytes("Content-Type", "application/json").unwrap(),
1203                        )
1204                        .with_header(
1205                            Header::from_bytes(
1206                                "Access-Control-Allow-Origin",
1207                                cors_origin.as_bytes().to_vec(),
1208                            )
1209                            .unwrap(),
1210                        ),
1211                );
1212                let _ = request.respond(response);
1213                mt.record_request("POST", 401);
1214                continue;
1215            }
1216            // Read up to UPLOAD_MAX + 1 bytes. If we read the full +1 we know
1217            // the client lied about Content-Length (or used chunked encoding
1218            // and overran). Reject in that case instead of continuing with a
1219            // truncated file.
1220            use std::io::Read;
1221            let mut bytes: Vec<u8> = Vec::with_capacity(8192);
1222            let mut limited = request.as_reader().take((UPLOAD_MAX as u64) + 1);
1223            let _ = limited.read_to_end(&mut bytes);
1224
1225            const MAX: usize = UPLOAD_MAX;
1226            if bytes.len() > MAX {
1227                let err = json_error("PAYLOAD_TOO_LARGE", "File exceeds 10 MB limit");
1228                let response = with_security_headers(
1229                    Response::from_string(&err)
1230                        .with_status_code(413u16)
1231                        .with_header(
1232                            Header::from_bytes("Content-Type", "application/json").unwrap(),
1233                        )
1234                        .with_header(
1235                            Header::from_bytes(
1236                                "Access-Control-Allow-Origin",
1237                                cors_origin.as_bytes().to_vec(),
1238                            )
1239                            .unwrap(),
1240                        ),
1241                );
1242                let _ = request.respond(response);
1243                mt.record_request("POST", 413);
1244                continue;
1245            }
1246
1247            // Headers.
1248            let content_type = request
1249                .headers()
1250                .iter()
1251                .find(|h| h.field.as_str() == "Content-Type" || h.field.as_str() == "content-type")
1252                .map(|h| h.value.as_str().to_string())
1253                .unwrap_or_else(|| "application/octet-stream".into());
1254            let filename = request
1255                .headers()
1256                .iter()
1257                .find(|h| h.field.as_str() == "X-Filename" || h.field.as_str() == "x-filename")
1258                .map(|h| h.value.as_str().to_string())
1259                .unwrap_or_else(|| "upload".into());
1260
1261            // If multipart, extract the first file part. Otherwise use bytes directly.
1262            let (name, ct, payload) = if content_type.starts_with("multipart/form-data") {
1263                match parse_multipart_first_file(&bytes, &content_type) {
1264                    Some(p) => p,
1265                    None => {
1266                        let err = json_error("INVALID_MULTIPART", "Could not parse multipart body");
1267                        let response = with_security_headers(
1268                            Response::from_string(&err)
1269                                .with_status_code(400u16)
1270                                .with_header(
1271                                    Header::from_bytes("Content-Type", "application/json").unwrap(),
1272                                )
1273                                .with_header(
1274                                    Header::from_bytes(
1275                                        "Access-Control-Allow-Origin",
1276                                        cors_origin.as_bytes().to_vec(),
1277                                    )
1278                                    .unwrap(),
1279                                ),
1280                        );
1281                        let _ = request.respond(response);
1282                        mt.record_request("POST", 400);
1283                        continue;
1284                    }
1285                }
1286            } else {
1287                (filename, content_type, bytes)
1288            };
1289
1290            let storage = pylon_storage::files::LocalFileStorage::new(
1291                &std::env::var("PYLON_FILES_DIR").unwrap_or_else(|_| "uploads".into()),
1292                &std::env::var("PYLON_FILES_URL_PREFIX").unwrap_or_else(|_| "/api/files".into()),
1293            );
1294
1295            let (status, body) =
1296                match pylon_storage::files::FileStorage::store(&storage, &name, &payload, &ct) {
1297                    Ok(stored) => (
1298                        201u16,
1299                        serde_json::to_string(&stored).unwrap_or_else(|_| "{}".into()),
1300                    ),
1301                    Err(e) => (500u16, json_error(&e.code, &e.message)),
1302                };
1303
1304            let response = with_security_headers(
1305                Response::from_string(&body)
1306                    .with_status_code(status)
1307                    .with_header(Header::from_bytes("Content-Type", "application/json").unwrap())
1308                    .with_header(
1309                        Header::from_bytes(
1310                            "Access-Control-Allow-Origin",
1311                            cors_origin.as_bytes().to_vec(),
1312                        )
1313                        .unwrap(),
1314                    ),
1315            );
1316            let _ = request.respond(response);
1317            mt.record_request("POST", status);
1318            continue;
1319        }
1320
1321        // Read body before routing (request is consumed by respond).
1322        // Skip for methods that cannot have a body.
1323        //
1324        // Size enforcement runs in TWO layers so a malicious client can't
1325        // stream 10 GiB into memory before we reject it:
1326        //   1. Content-Length header is compared to MAX_BODY_SIZE up front.
1327        //   2. The actual read uses `.take(MAX_BODY_SIZE + 1)` so a lying
1328        //      or chunked stream is capped at MAX + 1 bytes; if we read that
1329        //      many, we reject.
1330        const MAX_BODY_SIZE: usize = 10 * 1024 * 1024;
1331
1332        if let Some(declared) = request.body_length() {
1333            if declared > MAX_BODY_SIZE {
1334                let err_body = json_error(
1335                    "PAYLOAD_TOO_LARGE",
1336                    &format!("Content-Length {declared} exceeds max of {MAX_BODY_SIZE}"),
1337                );
1338                let response = with_security_headers(
1339                    Response::from_string(&err_body)
1340                        .with_status_code(413u16)
1341                        .with_header(
1342                            Header::from_bytes(
1343                                "Access-Control-Allow-Origin",
1344                                cors_origin.as_bytes().to_vec(),
1345                            )
1346                            .unwrap(),
1347                        ),
1348                );
1349                let _ = request.respond(response);
1350                mt.record_request(method.as_str(), 413);
1351                continue;
1352            }
1353        }
1354
1355        let mut body = String::new();
1356        if !matches!(
1357            method,
1358            Method::Get | Method::Head | Method::Options | Method::Delete
1359        ) {
1360            use std::io::Read;
1361            let mut limited = request.as_reader().take((MAX_BODY_SIZE as u64) + 1);
1362            let _ = limited.read_to_string(&mut body);
1363        }
1364
1365        if body.len() > MAX_BODY_SIZE {
1366            let err_body = json_error(
1367                "PAYLOAD_TOO_LARGE",
1368                &format!(
1369                    "Request body exceeds maximum size of {} bytes",
1370                    MAX_BODY_SIZE,
1371                ),
1372            );
1373            let response = with_security_headers(
1374                Response::from_string(&err_body)
1375                    .with_status_code(413u16)
1376                    .with_header(Header::from_bytes("Content-Type", "application/json").unwrap())
1377                    .with_header(
1378                        Header::from_bytes(
1379                            "Access-Control-Allow-Origin",
1380                            cors_origin.as_bytes().to_vec(),
1381                        )
1382                        .unwrap(),
1383                    ),
1384            );
1385            let _ = request.respond(response);
1386            mt.record_request(method.as_str(), 413);
1387            continue;
1388        }
1389
1390        // (auth_token + auth_ctx were resolved above, before the fast paths.)
1391
1392        // --- GET /api/shards/:id/connect — SSE snapshot stream ---
1393        if method == Method::Get {
1394            if let Some(rest) = url.strip_prefix("/api/shards/") {
1395                let rest = rest.split('?').next().unwrap_or(rest);
1396                if let Some(shard_id) = rest.strip_suffix("/connect") {
1397                    // Require an authenticated user. Shard SSE streams state
1398                    // snapshots tick-by-tick; an anonymous subscriber can
1399                    // both read that state AND influence via push_input (see
1400                    // the WS handler). Gate at the transport layer.
1401                    if auth_ctx.user_id.is_none() {
1402                        let err = json_error(
1403                            "AUTH_REQUIRED",
1404                            "Shard connect requires an authenticated session",
1405                        );
1406                        let response = with_security_headers(
1407                            Response::from_string(&err)
1408                                .with_status_code(401u16)
1409                                .with_header(
1410                                    Header::from_bytes("Content-Type", "application/json").unwrap(),
1411                                )
1412                                .with_header(
1413                                    Header::from_bytes(
1414                                        "Access-Control-Allow-Origin",
1415                                        cors_origin.as_bytes().to_vec(),
1416                                    )
1417                                    .unwrap(),
1418                                ),
1419                        );
1420                        let _ = request.respond(response);
1421                        mt.record_request("GET", 401);
1422                        continue;
1423                    }
1424                    let shards = match &shards_ref {
1425                        Some(s) => Arc::clone(s),
1426                        None => {
1427                            let err = json_error(
1428                                "SHARDS_NOT_AVAILABLE",
1429                                "Shard system is not configured",
1430                            );
1431                            let response = with_security_headers(
1432                                Response::from_string(&err)
1433                                    .with_status_code(503u16)
1434                                    .with_header(
1435                                        Header::from_bytes("Content-Type", "application/json")
1436                                            .unwrap(),
1437                                    )
1438                                    .with_header(
1439                                        Header::from_bytes(
1440                                            "Access-Control-Allow-Origin",
1441                                            cors_origin.as_bytes().to_vec(),
1442                                        )
1443                                        .unwrap(),
1444                                    ),
1445                            );
1446                            let _ = request.respond(response);
1447                            mt.record_request("GET", 503);
1448                            continue;
1449                        }
1450                    };
1451                    let shard = match shards.get(shard_id) {
1452                        Some(s) => s,
1453                        None => {
1454                            let err = json_error(
1455                                "SHARD_NOT_FOUND",
1456                                &format!("Shard \"{shard_id}\" not found"),
1457                            );
1458                            let response = with_security_headers(
1459                                Response::from_string(&err)
1460                                    .with_status_code(404u16)
1461                                    .with_header(
1462                                        Header::from_bytes("Content-Type", "application/json")
1463                                            .unwrap(),
1464                                    )
1465                                    .with_header(
1466                                        Header::from_bytes(
1467                                            "Access-Control-Allow-Origin",
1468                                            cors_origin.as_bytes().to_vec(),
1469                                        )
1470                                        .unwrap(),
1471                                    ),
1472                            );
1473                            let _ = request.respond(response);
1474                            mt.record_request("GET", 404);
1475                            continue;
1476                        }
1477                    };
1478
1479                    // Subscriber ID from ?sid= query param, else the authed user,
1480                    // else a generated anonymous ID.
1481                    let sub_id = url
1482                        .split("sid=")
1483                        .nth(1)
1484                        .and_then(|s| s.split('&').next())
1485                        .map(|s| s.to_string())
1486                        .or_else(|| auth_ctx.user_id.clone())
1487                        .unwrap_or_else(|| {
1488                            format!(
1489                                "anon_{}",
1490                                std::time::SystemTime::now()
1491                                    .duration_since(std::time::UNIX_EPOCH)
1492                                    .unwrap_or_default()
1493                                    .as_nanos()
1494                            )
1495                        });
1496                    let subscriber_id = pylon_realtime::SubscriberId::new(sub_id);
1497
1498                    let (tx, rx) = std::sync::mpsc::channel::<Vec<u8>>();
1499                    let streaming_body = StreamingBody::new(rx);
1500
1501                    let tx_clone = tx.clone();
1502                    let sink: pylon_realtime::SnapshotSink =
1503                        Box::new(move |tick: u64, bytes: &[u8]| {
1504                            // Format as SSE with an id: line carrying the tick
1505                            // number so clients can resume with Last-Event-ID.
1506                            let mut frame = format!("id: {tick}\ndata: ").into_bytes();
1507                            frame.extend_from_slice(bytes);
1508                            frame.extend_from_slice(b"\n\n");
1509                            let _ = tx_clone.send(frame);
1510                        });
1511
1512                    let shard_auth = pylon_realtime::ShardAuth {
1513                        user_id: auth_ctx.user_id.clone(),
1514                        is_admin: auth_ctx.is_admin,
1515                    };
1516                    if let Err(e) = shard.add_subscriber(subscriber_id.clone(), sink, &shard_auth) {
1517                        let (status, code) = match &e {
1518                            pylon_realtime::ShardError::Unauthorized(_) => (403u16, "UNAUTHORIZED"),
1519                            _ => (429u16, "SUBSCRIBE_FAILED"),
1520                        };
1521                        let err = json_error(code, &e.to_string());
1522                        let response = with_security_headers(
1523                            Response::from_string(&err)
1524                                .with_status_code(status)
1525                                .with_header(
1526                                    Header::from_bytes("Content-Type", "application/json").unwrap(),
1527                                )
1528                                .with_header(
1529                                    Header::from_bytes(
1530                                        "Access-Control-Allow-Origin",
1531                                        cors_origin.as_bytes().to_vec(),
1532                                    )
1533                                    .unwrap(),
1534                                ),
1535                        );
1536                        let _ = request.respond(response);
1537                        mt.record_request("GET", status);
1538                        continue;
1539                    }
1540
1541                    // Auto-unsubscribe when the client disconnects: we watch
1542                    // for mpsc channel disconnection in a sentinel thread.
1543                    {
1544                        let shard_cleanup = Arc::clone(&shard);
1545                        let sub_id_cleanup = subscriber_id.clone();
1546                        let tx_liveness = tx.clone();
1547                        std::thread::spawn(move || {
1548                            // Send a heartbeat every 30s; if send fails, the
1549                            // channel is closed (client disconnected).
1550                            loop {
1551                                std::thread::sleep(std::time::Duration::from_secs(30));
1552                                if tx_liveness.send(b": heartbeat\n\n".to_vec()).is_err() {
1553                                    shard_cleanup.remove_subscriber(&sub_id_cleanup);
1554                                    return;
1555                                }
1556                                if !shard_cleanup.is_running() {
1557                                    return;
1558                                }
1559                            }
1560                        });
1561                    }
1562
1563                    let response = with_security_headers(Response::new(
1564                        tiny_http::StatusCode(200),
1565                        vec![
1566                            Header::from_bytes("Content-Type", "text/event-stream").unwrap(),
1567                            Header::from_bytes("Cache-Control", "no-cache").unwrap(),
1568                            Header::from_bytes("Connection", "keep-alive").unwrap(),
1569                            Header::from_bytes(
1570                                "Access-Control-Allow-Origin",
1571                                cors_origin.as_bytes().to_vec(),
1572                            )
1573                            .unwrap(),
1574                        ],
1575                        streaming_body,
1576                        None,
1577                        None,
1578                    ));
1579                    let _ = request.respond(response);
1580                    mt.record_request("GET", 200);
1581                    continue;
1582                }
1583            }
1584        }
1585
1586        // --- POST /api/fn/:name with Accept: text/event-stream — streaming functions ---
1587        if method == Method::Post
1588            && url.starts_with("/api/fn/")
1589            && url != "/api/fn/traces"
1590            && request.headers().iter().any(|h| {
1591                (h.field.as_str() == "Accept" || h.field.as_str() == "accept")
1592                    && h.value.as_str().contains("text/event-stream")
1593            })
1594        {
1595            let fn_name = url
1596                .strip_prefix("/api/fn/")
1597                .unwrap_or("")
1598                .split('?')
1599                .next()
1600                .unwrap_or("")
1601                .to_string();
1602
1603            if let Some(fn_ops) = &fn_ops_maybe {
1604                // Mirror the router's gates so the streaming fast path doesn't
1605                // become a way to bypass function auth / rate limits.
1606                // 1. Function must exist (otherwise 404, not a hung SSE).
1607                if pylon_router::FnOps::get_fn(fn_ops.as_ref(), &fn_name).is_none() {
1608                    let err = json_error(
1609                        "FN_NOT_FOUND",
1610                        &format!("Function \"{fn_name}\" is not registered"),
1611                    );
1612                    let response = with_security_headers(
1613                        Response::from_string(&err)
1614                            .with_status_code(404u16)
1615                            .with_header(
1616                                Header::from_bytes("Content-Type", "application/json").unwrap(),
1617                            )
1618                            .with_header(
1619                                Header::from_bytes(
1620                                    "Access-Control-Allow-Origin",
1621                                    cors_origin.as_bytes().to_vec(),
1622                                )
1623                                .unwrap(),
1624                            ),
1625                    );
1626                    let _ = request.respond(response);
1627                    mt.record_request("POST", 404);
1628                    continue;
1629                }
1630                // 2. Per-function rate limit (identity = user_id or "anon").
1631                let identity = auth_ctx.user_id.as_deref().unwrap_or("anon");
1632                if let Err(retry_after) =
1633                    pylon_router::FnOps::check_rate_limit(fn_ops.as_ref(), &fn_name, identity)
1634                {
1635                    let body = format!(
1636                        r#"{{"error":{{"code":"RATE_LIMITED","message":"Function \"{fn_name}\" rate limit exceeded","retry_after_secs":{retry_after}}}}}"#
1637                    );
1638                    let response = with_security_headers(
1639                        Response::from_string(&body)
1640                            .with_status_code(429u16)
1641                            .with_header(
1642                                Header::from_bytes("Content-Type", "application/json").unwrap(),
1643                            )
1644                            .with_header(
1645                                Header::from_bytes(
1646                                    "Access-Control-Allow-Origin",
1647                                    cors_origin.as_bytes().to_vec(),
1648                                )
1649                                .unwrap(),
1650                            ),
1651                    );
1652                    let _ = request.respond(response);
1653                    mt.record_request("POST", 429);
1654                    continue;
1655                }
1656
1657                let args: serde_json::Value =
1658                    serde_json::from_str(&body).unwrap_or(serde_json::json!({}));
1659
1660                let auth = pylon_functions::protocol::AuthInfo {
1661                    user_id: auth_ctx.user_id.clone(),
1662                    is_admin: auth_ctx.is_admin,
1663                    tenant_id: auth_ctx.tenant_id.clone(),
1664                };
1665
1666                let (tx, rx) = std::sync::mpsc::channel::<Vec<u8>>();
1667                let streaming_body = StreamingBody::new(rx);
1668
1669                let fn_ops_cl = Arc::clone(fn_ops);
1670                let tx_stream = tx.clone();
1671                std::thread::spawn(move || {
1672                    let tx_cb = tx_stream.clone();
1673                    let on_stream: Box<dyn FnMut(&str) + Send> = Box::new(move |chunk: &str| {
1674                        let sse = format!("data: {}\n\n", chunk);
1675                        let _ = tx_cb.send(sse.into_bytes());
1676                    });
1677
1678                    let result = pylon_router::FnOps::call(
1679                        fn_ops_cl.as_ref(),
1680                        &fn_name,
1681                        args,
1682                        auth,
1683                        Some(on_stream),
1684                        None, // streaming /api/fn/:name never carries HTTP request metadata
1685                    );
1686                    match result {
1687                        Ok((value, _trace)) => {
1688                            let done = format!(
1689                                "event: result\ndata: {}\n\n",
1690                                serde_json::to_string(&value).unwrap_or_else(|_| "null".into())
1691                            );
1692                            let _ = tx_stream.send(done.into_bytes());
1693                        }
1694                        Err(e) => {
1695                            let err = format!(
1696                                "event: error\ndata: {}\n\n",
1697                                serde_json::json!({"code": e.code, "message": e.message})
1698                            );
1699                            let _ = tx_stream.send(err.into_bytes());
1700                        }
1701                    }
1702                });
1703
1704                let response = with_security_headers(Response::new(
1705                    tiny_http::StatusCode(200),
1706                    vec![
1707                        Header::from_bytes("Content-Type", "text/event-stream").unwrap(),
1708                        Header::from_bytes("Cache-Control", "no-cache").unwrap(),
1709                        Header::from_bytes("Connection", "keep-alive").unwrap(),
1710                        Header::from_bytes(
1711                            "Access-Control-Allow-Origin",
1712                            cors_origin.as_bytes().to_vec(),
1713                        )
1714                        .unwrap(),
1715                    ],
1716                    streaming_body,
1717                    None,
1718                    None,
1719                ));
1720                let _ = request.respond(response);
1721                mt.record_request("POST", 200);
1722                continue;
1723            }
1724        }
1725
1726        // --- POST /api/ai/stream — SSE streaming AI completion ---
1727        if url == "/api/ai/stream" && method == Method::Post {
1728            // AI endpoints spend real money per call. Require auth so a
1729            // drive-by caller can't burn through the provider budget.
1730            if auth_ctx.user_id.is_none() {
1731                let err = json_error(
1732                    "AUTH_REQUIRED",
1733                    "/api/ai/stream requires an authenticated session",
1734                );
1735                let response = with_security_headers(
1736                    Response::from_string(&err)
1737                        .with_status_code(401u16)
1738                        .with_header(
1739                            Header::from_bytes("Content-Type", "application/json").unwrap(),
1740                        )
1741                        .with_header(
1742                            Header::from_bytes(
1743                                "Access-Control-Allow-Origin",
1744                                cors_origin.as_bytes().to_vec(),
1745                            )
1746                            .unwrap(),
1747                        ),
1748                );
1749                let _ = request.respond(response);
1750                mt.record_request("POST", 401);
1751                continue;
1752            }
1753            let ai_provider = std::env::var("PYLON_AI_PROVIDER").unwrap_or_default();
1754            let ai_key = std::env::var("PYLON_AI_API_KEY").unwrap_or_default();
1755            let ai_model = std::env::var("PYLON_AI_MODEL").unwrap_or_default();
1756            let ai_base = std::env::var("PYLON_AI_BASE_URL").unwrap_or_default();
1757
1758            if ai_key.is_empty() && ai_provider != "custom" {
1759                let err = json_error(
1760                    "AI_NOT_CONFIGURED",
1761                    "Set PYLON_AI_PROVIDER and PYLON_AI_API_KEY",
1762                );
1763                let response = with_security_headers(
1764                    Response::from_string(&err)
1765                        .with_status_code(503u16)
1766                        .with_header(
1767                            Header::from_bytes("Content-Type", "application/json").unwrap(),
1768                        )
1769                        .with_header(
1770                            Header::from_bytes(
1771                                "Access-Control-Allow-Origin",
1772                                cors_origin.as_bytes().to_vec(),
1773                            )
1774                            .unwrap(),
1775                        ),
1776                );
1777                let _ = request.respond(response);
1778                mt.record_request("POST", 503);
1779                continue;
1780            }
1781
1782            let parsed: serde_json::Value = match serde_json::from_str(&body) {
1783                Ok(v) => v,
1784                Err(_) => {
1785                    let err = json_error("INVALID_JSON", "Invalid request body");
1786                    let response = with_security_headers(
1787                        Response::from_string(&err)
1788                            .with_status_code(400u16)
1789                            .with_header(
1790                                Header::from_bytes("Content-Type", "application/json").unwrap(),
1791                            )
1792                            .with_header(
1793                                Header::from_bytes(
1794                                    "Access-Control-Allow-Origin",
1795                                    cors_origin.as_bytes().to_vec(),
1796                                )
1797                                .unwrap(),
1798                            ),
1799                    );
1800                    let _ = request.respond(response);
1801                    mt.record_request("POST", 400);
1802                    continue;
1803                }
1804            };
1805
1806            let messages: Vec<AiMessage> = match parsed.get("messages").and_then(|m| m.as_array()) {
1807                Some(arr) => arr
1808                    .iter()
1809                    .filter_map(|m| {
1810                        let role = m.get("role")?.as_str()?.to_string();
1811                        let content = m.get("content")?.as_str()?.to_string();
1812                        Some(AiMessage { role, content })
1813                    })
1814                    .collect(),
1815                None => {
1816                    let err = json_error("MISSING_FIELD", "\"messages\" array is required");
1817                    let response = with_security_headers(
1818                        Response::from_string(&err)
1819                            .with_status_code(400u16)
1820                            .with_header(
1821                                Header::from_bytes("Content-Type", "application/json").unwrap(),
1822                            )
1823                            .with_header(
1824                                Header::from_bytes(
1825                                    "Access-Control-Allow-Origin",
1826                                    cors_origin.as_bytes().to_vec(),
1827                                )
1828                                .unwrap(),
1829                            ),
1830                    );
1831                    let _ = request.respond(response);
1832                    mt.record_request("POST", 400);
1833                    continue;
1834                }
1835            };
1836
1837            // Override model from request body if provided.
1838            let model = parsed
1839                .get("model")
1840                .and_then(|m| m.as_str())
1841                .map(|s| s.to_string())
1842                .unwrap_or(ai_model);
1843
1844            let proxy = match ai_provider.as_str() {
1845                "anthropic" => AiProxyPlugin::anthropic(&ai_key, &model),
1846                "openai" => AiProxyPlugin::openai(&ai_key, &model),
1847                "custom" => AiProxyPlugin::custom_with_model(&ai_base, &ai_key, &model),
1848                _ => AiProxyPlugin::openai(&ai_key, &model),
1849            };
1850
1851            // Set up a channel-based streaming body so tiny_http streams
1852            // data to the client as chunks arrive from the AI provider.
1853            let (tx, rx) = std::sync::mpsc::channel::<Vec<u8>>();
1854            let streaming_body = StreamingBody::new(rx);
1855
1856            // Spawn the provider request on a background thread. Each chunk
1857            // is formatted as an SSE event and pushed through the channel.
1858            std::thread::spawn(move || {
1859                let result = proxy.stream_completion(&messages, &mut |chunk| {
1860                    let sse = format!(
1861                        "data: {}
1862
1863",
1864                        serde_json::json!({
1865                            "choices": [{"index": 0, "delta": {"content": chunk}}]
1866                        })
1867                    );
1868                    let _ = tx.send(sse.into_bytes());
1869                });
1870
1871                // Send a final event indicating completion or error.
1872                match result {
1873                    Ok(_) => {
1874                        let _ = tx.send(
1875                            b"data: [DONE]
1876
1877"
1878                            .to_vec(),
1879                        );
1880                    }
1881                    Err(e) => {
1882                        let err_event = format!(
1883                            "data: {}
1884
1885",
1886                            serde_json::json!({"error": {"message": e, "type": "stream_error"}})
1887                        );
1888                        let _ = tx.send(err_event.into_bytes());
1889                    }
1890                }
1891                // tx is dropped here, which causes StreamingBody::read to return 0 (EOF).
1892            });
1893
1894            let response = with_security_headers(Response::new(
1895                tiny_http::StatusCode(200),
1896                vec![
1897                    Header::from_bytes("Content-Type", "text/event-stream").unwrap(),
1898                    Header::from_bytes("Cache-Control", "no-cache").unwrap(),
1899                    Header::from_bytes("Connection", "keep-alive").unwrap(),
1900                    Header::from_bytes(
1901                        "Access-Control-Allow-Origin",
1902                        cors_origin.as_bytes().to_vec(),
1903                    )
1904                    .unwrap(),
1905                ],
1906                streaming_body,
1907                None, // unknown content length = chunked transfer
1908                None,
1909            ));
1910            let _ = request.respond(response);
1911            mt.record_request("POST", 200);
1912            continue;
1913        }
1914
1915        // Studio route (returns HTML, not JSON).
1916        //
1917        // Privileged admin UI. It renders the full schema and lets the
1918        // operator run mutations against the data browser. In production we
1919        // require an admin token; in dev mode we leave it open so
1920        // `pylon dev` remains friction-free for the single-user case.
1921        //
1922        // Serving a WWW-Authenticate Basic realm isn't useful here because
1923        // admin auth is bearer-token based. Callers get a 401 and should
1924        // retry with `Authorization: Bearer <PYLON_ADMIN_TOKEN>`.
1925        let (status, response_body, content_type, is_studio, extra_headers) = if (url == "/studio"
1926            || url == "/studio/")
1927            && method == Method::Get
1928        {
1929            if !is_dev && !auth_ctx.is_admin {
1930                let body = json_error(
1931                    "AUTH_REQUIRED",
1932                    "/studio requires admin auth in production (set PYLON_ADMIN_TOKEN and pass it as Bearer)",
1933                );
1934                let response = with_security_headers(
1935                    Response::from_string(&body)
1936                        .with_status_code(401u16)
1937                        .with_header(
1938                            Header::from_bytes("Content-Type", "application/json").unwrap(),
1939                        )
1940                        .with_header(
1941                            Header::from_bytes(
1942                                "Access-Control-Allow-Origin",
1943                                cors_origin.as_bytes().to_vec(),
1944                            )
1945                            .unwrap(),
1946                        ),
1947                );
1948                let _ = request.respond(response);
1949                mt.record_request("GET", 401);
1950                continue;
1951            }
1952            // Derive the public base URL from the request's Host header +
1953            // X-Forwarded-Proto (Fly / any HTTPS terminator sets this).
1954            // Hardcoding `http://localhost:{port}` here meant the studio
1955            // HTML served from pylon-crm.fly.dev tried to fetch
1956            // http://localhost:4321/api/* from the browser, which CSP
1957            // rightly blocks.
1958            let host = request
1959                .headers()
1960                .iter()
1961                .find(|h| h.field.equiv("Host"))
1962                .map(|h| h.value.as_str().to_string())
1963                .unwrap_or_else(|| format!("localhost:{port}"));
1964            let scheme = request
1965                .headers()
1966                .iter()
1967                .find(|h| h.field.equiv("X-Forwarded-Proto"))
1968                .map(|h| h.value.as_str().to_string())
1969                .unwrap_or_else(|| "http".to_string());
1970            let base = format!("{scheme}://{host}");
1971            let html = pylon_studio_api::generate_studio_html(rt.manifest(), &base);
1972            (
1973                200u16,
1974                html,
1975                "text/html",
1976                true,
1977                Vec::<(String, String)>::new(),
1978            )
1979        } else {
1980            // Run plugin middleware with per-request metadata so rate-limit
1981            // plugins can bucket by peer IP (not just user id) when the
1982            // caller is anonymous.
1983            let meta = pylon_plugin::RequestMeta {
1984                peer_ip: peer_ip.as_str(),
1985            };
1986            if let Err(e) = pr.run_on_request_with_meta(method.as_str(), &url, &auth_ctx, &meta) {
1987                (
1988                    e.status,
1989                    json_error(&e.code, &e.message),
1990                    "application/json",
1991                    false,
1992                    Vec::new(),
1993                )
1994            } else if let Some((s, b)) =
1995                pr.try_handle_route(method.as_str(), &url, &body, &auth_ctx)
1996            {
1997                // Plugin handled the route.
1998                (s, b, "application/json", false, Vec::new())
1999            } else {
2000                let notifier = WsSseNotifier {
2001                    ws: Arc::clone(&wh),
2002                    sse: Arc::clone(&sh),
2003                };
2004                let openapi_gen = RuntimeOpenApiGenerator {
2005                    manifest: rt.manifest(),
2006                };
2007                let file_ops = LocalFileOps::new_default();
2008                let cache_adapter = CacheAdapter(Arc::clone(&ca));
2009                let pubsub_adapter = PubSubAdapter(Arc::clone(&ps));
2010                let email_adapter = EmailAdapter::from_env();
2011                let fn_ops: Option<&dyn pylon_router::FnOps> =
2012                    fn_ops_ref.as_deref().map(|f| f as &dyn pylon_router::FnOps);
2013                let shard_adapter = shards_ref.as_ref().map(|reg| ShardOpsAdapter {
2014                    registry: Arc::clone(reg),
2015                });
2016                let shard_ops: Option<&dyn pylon_router::ShardOps> = shard_adapter
2017                    .as_ref()
2018                    .map(|a| a as &dyn pylon_router::ShardOps);
2019                let plugin_hooks = PluginHooksAdapter(Arc::clone(&pr));
2020                // Snapshot request headers as (name, value) pairs for the
2021                // router to forward into webhook-invoked actions. Header
2022                // names are left as-sent; the router lowercases + merges
2023                // duplicates per RFC 7230 when constructing RequestInfo.
2024                let request_headers: Vec<(String, String)> = request
2025                    .headers()
2026                    .iter()
2027                    .map(|h| (h.field.as_str().to_string(), h.value.as_str().to_string()))
2028                    .collect();
2029                let router_ctx = pylon_router::RouterContext {
2030                    store: rt.as_ref(),
2031                    session_store: &ss,
2032                    magic_codes: &mc,
2033                    oauth_state: &os,
2034                    account_store: &acc,
2035                    api_keys: &ak,
2036                    orgs: &og,
2037                    siwe: &sw,
2038                    phone_codes: &pcd,
2039                    passkeys: &pks,
2040                    verification: &vrf,
2041                    audit: &aud,
2042                    policy_engine: &pe,
2043                    change_log: &cl,
2044                    notifier: &notifier,
2045                    rooms: rm.as_ref(),
2046                    cache: &cache_adapter,
2047                    pubsub: &pubsub_adapter,
2048                    jobs: jq.as_ref(),
2049                    scheduler: sc.as_ref(),
2050                    workflows: we.as_ref(),
2051                    files: &file_ops,
2052                    openapi: &openapi_gen,
2053                    functions: fn_ops,
2054                    email: &email_adapter,
2055                    shards: shard_ops,
2056                    plugin_hooks: &plugin_hooks,
2057                    auth_ctx: &auth_ctx,
2058                    trusted_origins: &trusted_origins_ref,
2059                    is_dev,
2060                    request_headers: &request_headers,
2061                    peer_ip: peer_ip.as_str(),
2062                    cookie_config: cookie_config.as_ref(),
2063                    response_headers: std::cell::RefCell::new(Vec::new()),
2064                };
2065                let http_method = HttpMethod::from_str(method.as_str());
2066                let (s, b, _ct) = pylon_router::route(
2067                    &router_ctx,
2068                    http_method,
2069                    &url,
2070                    &body,
2071                    auth_token.as_deref(),
2072                );
2073                let extra_headers = router_ctx.take_response_headers();
2074                (s, b, "application/json", false, extra_headers)
2075            }
2076        };
2077
2078        let mut response = Response::from_string(&response_body)
2079            .with_status_code(status)
2080            .with_header(Header::from_bytes("Content-Type", content_type).unwrap())
2081            .with_header(
2082                Header::from_bytes(
2083                    "Access-Control-Allow-Origin",
2084                    cors_origin.as_bytes().to_vec(),
2085                )
2086                .unwrap(),
2087            )
2088            .with_header(
2089                Header::from_bytes(
2090                    "Access-Control-Allow-Methods",
2091                    "GET, POST, PATCH, DELETE, OPTIONS",
2092                )
2093                .unwrap(),
2094            )
2095            .with_header(
2096                Header::from_bytes(
2097                    "Access-Control-Allow-Headers",
2098                    "Content-Type, Authorization",
2099                )
2100                .unwrap(),
2101            );
2102        // Cookie-based auth requires `Access-Control-Allow-Credentials:
2103        // true` on the response, paired with a specific origin. Vary
2104        // ensures intermediaries don't cache one origin's response and
2105        // serve it back to a different origin's browser.
2106        if allow_credentials {
2107            response = response
2108                .with_header(
2109                    Header::from_bytes("Access-Control-Allow-Credentials", "true").unwrap(),
2110                )
2111                .with_header(Header::from_bytes("Vary", "Origin").unwrap());
2112        }
2113
2114        // Apply any extra headers handlers attached via the router context
2115        // (Set-Cookie on login/logout, Location on OAuth GET callback).
2116        // Bytes from these headers come from server-built strings — bad
2117        // bytes here would be a programming bug, not request-driven, so a
2118        // failed Header::from_bytes is silently dropped rather than
2119        // poisoning the response.
2120        for (name, value) in extra_headers {
2121            if let Ok(h) = Header::from_bytes(name.as_bytes(), value.as_bytes().to_vec()) {
2122                response = response.with_header(h);
2123            }
2124        }
2125
2126        // Add Content-Security-Policy for Studio HTML responses.
2127        //
2128        // Studio talks to the same Rust process over HTTP (same origin)
2129        // AND a sibling WebSocket port (port+1, scheme ws:). CSP's
2130        // `default-src` covers `connect-src` by fallback, so any
2131        // directive we set there must include the WS scheme or the
2132        // browser silently blocks the live-sync connection.
2133        //
2134        // `ws:` + `wss:` cover localhost dev + TLS deploys without
2135        // hard-coding ports. Same-origin `'self'` keeps HTTP fetches
2136        // allowed. Inline + eval stay for the Tailwind/Babel CDN scripts
2137        // the current Studio HTML includes.
2138        if is_studio {
2139            response = response.with_header(
2140                Header::from_bytes(
2141                    "Content-Security-Policy",
2142                    "default-src 'self' 'unsafe-inline' 'unsafe-eval' https://cdn.tailwindcss.com https://unpkg.com ws: wss:",
2143                ).unwrap(),
2144            );
2145        }
2146
2147        let response = with_security_headers(response);
2148
2149        let _ = request.respond(response);
2150        mt.record_request(method.as_str(), status);
2151    }
2152
2153    tracing::warn!("Shutting down gracefully...");
2154
2155    // --- Drain phase ---
2156    // Stop accepting new work, let in-flight finish, close subsystems cleanly.
2157    let drain_timeout = std::time::Duration::from_secs(
2158        std::env::var("PYLON_DRAIN_SECS")
2159            .ok()
2160            .and_then(|s| s.parse().ok())
2161            .unwrap_or(10),
2162    );
2163    let start = Instant::now();
2164
2165    // Stop any running shards so their tick loops exit.
2166    if let Some(reg) = &shard_registry {
2167        for id in reg.ids() {
2168            if let Some(shard) = reg.get(&id) {
2169                shard.stop();
2170            }
2171        }
2172    }
2173
2174    // Let the scheduler finish its current cycle.
2175    let _ = &scheduler; // drop Arc at end of scope
2176
2177    // Wait for outstanding workers to idle, up to drain_timeout.
2178    while start.elapsed() < drain_timeout {
2179        let pending_jobs = job_queue.stats().pending;
2180        if pending_jobs == 0 {
2181            break;
2182        }
2183        std::thread::sleep(std::time::Duration::from_millis(100));
2184    }
2185
2186    let elapsed = start.elapsed();
2187    tracing::warn!(
2188        "Drain complete in {:.1}s (timeout {}s)",
2189        elapsed.as_secs_f32(),
2190        drain_timeout.as_secs()
2191    );
2192    Ok(())
2193}
2194
2195// The route() function has been extracted to the `pylon-router` crate.
2196// See `pylon_router::route()` for the platform-agnostic routing logic.
2197// The server now delegates to it via a `RouterContext`.
2198
2199fn json_error(code: &str, message: &str) -> String {
2200    pylon_router::json_error(code, message)
2201}
2202
2203/// Bundle of the four auth-state stores. Built in one place so backend
2204/// selection (Postgres vs. SQLite) is consistent across them — there's
2205/// no scenario where sessions live in PG but accounts live in a sibling
2206/// SQLite file. Selection rules, in priority:
2207///
2208/// 1. `DATABASE_URL=postgres://…` → all four stores point at PG.
2209/// 2. `PYLON_SESSION_DB=path/to/file.db` → SQLite, explicit path.
2210/// 3. `<app_db_path>.sessions.db` → SQLite alongside the app DB.
2211/// 4. `PYLON_SESSION_IN_MEMORY=1` or no app DB → in-memory.
2212struct AuthStores {
2213    session_store: Arc<SessionStore>,
2214    magic_codes: Arc<pylon_auth::MagicCodeStore>,
2215    oauth_state: Arc<pylon_auth::OAuthStateStore>,
2216    account_store: Arc<pylon_auth::AccountStore>,
2217    api_keys: Arc<pylon_auth::api_key::ApiKeyStore>,
2218    orgs: Arc<pylon_auth::org::OrgStore>,
2219    siwe: Arc<pylon_auth::siwe::NonceStore>,
2220    phone_codes: Arc<pylon_auth::phone::PhoneCodeStore>,
2221    passkeys: Arc<pylon_auth::webauthn::PasskeyStore>,
2222    verification: Arc<pylon_auth::verification::VerificationStore>,
2223    audit: Arc<pylon_auth::audit::AuditStore>,
2224}
2225
2226// Memoized env reads — auth resolver runs PER REQUEST so we can't
2227// afford `std::env::var` syscalls there. OnceLock initialized
2228// lazily on first lookup; tests that mutate env between cases
2229// should use process-level isolation, not in-process mutation.
2230fn jwt_secret() -> Option<&'static String> {
2231    static CELL: std::sync::OnceLock<Option<String>> = std::sync::OnceLock::new();
2232    CELL.get_or_init(|| {
2233        std::env::var("PYLON_JWT_SECRET")
2234            .ok()
2235            .filter(|s| !s.is_empty())
2236    })
2237    .as_ref()
2238}
2239
2240fn jwt_issuer() -> Option<&'static String> {
2241    static CELL: std::sync::OnceLock<Option<String>> = std::sync::OnceLock::new();
2242    CELL.get_or_init(|| {
2243        std::env::var("PYLON_JWT_ISSUER")
2244            .ok()
2245            .filter(|s| !s.is_empty())
2246    })
2247    .as_ref()
2248}
2249
2250fn build_auth_stores(app_db_path: Option<&str>, session_lifetime: u64) -> AuthStores {
2251    // Forced in-memory escape hatch — used by integration tests that
2252    // never want to touch disk.
2253    let force_in_memory = std::env::var("PYLON_SESSION_IN_MEMORY")
2254        .map(|v| v == "1" || v == "true")
2255        .unwrap_or(false);
2256
2257    // Postgres path — wins over PYLON_SESSION_DB when both are set so the
2258    // multi-replica deploy doesn't silently fall back to per-replica SQLite.
2259    let pg_url = std::env::var("DATABASE_URL")
2260        .ok()
2261        .filter(|u| u.starts_with("postgres://") || u.starts_with("postgresql://"));
2262
2263    if let Some(url) = pg_url {
2264        if force_in_memory {
2265            // Tests that explicitly opt out of persistence shouldn't be
2266            // overridden by an ambient DATABASE_URL in CI.
2267            return in_memory_auth_stores(session_lifetime);
2268        }
2269        return build_pg_auth_stores(&url, session_lifetime);
2270    }
2271
2272    let sqlite_path = std::env::var("PYLON_SESSION_DB")
2273        .ok()
2274        .or_else(|| app_db_path.map(|p| format!("{p}.sessions.db")));
2275
2276    match (force_in_memory, sqlite_path) {
2277        (true, _) | (_, None) => in_memory_auth_stores(session_lifetime),
2278        (false, Some(path)) => build_sqlite_auth_stores(&path, session_lifetime),
2279    }
2280}
2281
2282fn in_memory_auth_stores(session_lifetime: u64) -> AuthStores {
2283    AuthStores {
2284        session_store: Arc::new(SessionStore::new().with_lifetime(session_lifetime)),
2285        magic_codes: Arc::new(pylon_auth::MagicCodeStore::new()),
2286        oauth_state: Arc::new(pylon_auth::OAuthStateStore::new()),
2287        account_store: Arc::new(pylon_auth::AccountStore::new()),
2288        api_keys: Arc::new(pylon_auth::api_key::ApiKeyStore::new()),
2289        orgs: Arc::new(pylon_auth::org::OrgStore::new()),
2290        siwe: Arc::new(pylon_auth::siwe::NonceStore::new()),
2291        phone_codes: Arc::new(pylon_auth::phone::PhoneCodeStore::new()),
2292        passkeys: Arc::new(pylon_auth::webauthn::PasskeyStore::new()),
2293        verification: Arc::new(pylon_auth::verification::VerificationStore::new()),
2294        audit: Arc::new(pylon_auth::audit::AuditStore::new()),
2295    }
2296}
2297
2298fn build_sqlite_auth_stores(path: &str, session_lifetime: u64) -> AuthStores {
2299    let session_store = match crate::session_backend::SqliteSessionBackend::open(path) {
2300        Ok(b) => {
2301            tracing::info!("[pylon] Auth state (SQLite): {path}");
2302            SessionStore::with_backend(Box::new(b)).with_lifetime(session_lifetime)
2303        }
2304        Err(e) => {
2305            tracing::warn!("[pylon] could not open session DB {path}: {e}. In-memory fallback.");
2306            SessionStore::new().with_lifetime(session_lifetime)
2307        }
2308    };
2309    let magic_codes = match crate::magic_code_backend::SqliteMagicCodeBackend::open(path) {
2310        Ok(b) => pylon_auth::MagicCodeStore::with_backend(Box::new(b)),
2311        Err(e) => {
2312            tracing::warn!("[pylon] magic-code SQLite backend unavailable: {e}");
2313            pylon_auth::MagicCodeStore::new()
2314        }
2315    };
2316    let oauth_state = match crate::oauth_backend::SqliteOAuthBackend::open(path) {
2317        Ok(b) => pylon_auth::OAuthStateStore::with_backend(Box::new(b)),
2318        Err(e) => {
2319            tracing::warn!("[pylon] OAuth state SQLite backend unavailable: {e}");
2320            pylon_auth::OAuthStateStore::new()
2321        }
2322    };
2323    let account_store = match crate::account_backend::SqliteAccountBackend::open(path) {
2324        Ok(b) => pylon_auth::AccountStore::with_backend(Box::new(b)),
2325        Err(e) => {
2326            tracing::warn!("[pylon] account-link SQLite backend unavailable: {e}");
2327            pylon_auth::AccountStore::new()
2328        }
2329    };
2330    let api_keys = match crate::api_key_backend::SqliteApiKeyBackend::open(path) {
2331        Ok(b) => pylon_auth::api_key::ApiKeyStore::with_backend(Box::new(b)),
2332        Err(e) => {
2333            tracing::warn!("[pylon] api-key SQLite backend unavailable: {e}");
2334            pylon_auth::api_key::ApiKeyStore::new()
2335        }
2336    };
2337    let orgs = match crate::org_backend::SqliteOrgBackend::open(path) {
2338        Ok(b) => pylon_auth::org::OrgStore::with_backend(Box::new(b)),
2339        Err(e) => {
2340            tracing::warn!("[pylon] org SQLite backend unavailable: {e}");
2341            pylon_auth::org::OrgStore::new()
2342        }
2343    };
2344    let verification = match crate::verification_backend::SqliteVerificationBackend::open(path) {
2345        Ok(b) => pylon_auth::verification::VerificationStore::with_backend(Box::new(b)),
2346        Err(e) => {
2347            tracing::warn!("[pylon] verification SQLite backend unavailable: {e}");
2348            pylon_auth::verification::VerificationStore::new()
2349        }
2350    };
2351    let audit = match crate::audit_backend::SqliteAuditBackend::open(path) {
2352        Ok(b) => pylon_auth::audit::AuditStore::with_backend(Box::new(b)),
2353        Err(e) => {
2354            tracing::warn!("[pylon] audit SQLite backend unavailable: {e}");
2355            pylon_auth::audit::AuditStore::new()
2356        }
2357    };
2358    AuthStores {
2359        session_store: Arc::new(session_store),
2360        magic_codes: Arc::new(magic_codes),
2361        oauth_state: Arc::new(oauth_state),
2362        account_store: Arc::new(account_store),
2363        api_keys: Arc::new(api_keys),
2364        orgs: Arc::new(orgs),
2365        siwe: Arc::new(pylon_auth::siwe::NonceStore::new()),
2366        phone_codes: Arc::new(pylon_auth::phone::PhoneCodeStore::new()),
2367        passkeys: Arc::new(pylon_auth::webauthn::PasskeyStore::new()),
2368        verification: Arc::new(verification),
2369        audit: Arc::new(audit),
2370    }
2371}
2372
2373fn build_pg_auth_stores(url: &str, session_lifetime: u64) -> AuthStores {
2374    // Each backend opens its own connection. Sessions/oauth-state/magic-codes/
2375    // accounts are low-frequency relative to entity CRUD — keeping them on
2376    // separate connections avoids a "oauth lookup blocks an entity write"
2377    // false-sharing scenario at the cost of a few idle PG connections.
2378    let session_store = match crate::session_backend::PostgresSessionBackend::connect(url) {
2379        Ok(b) => {
2380            tracing::info!("[pylon] Auth state (Postgres): {url}");
2381            SessionStore::with_backend(Box::new(b)).with_lifetime(session_lifetime)
2382        }
2383        Err(e) => {
2384            tracing::warn!("[pylon] PG session backend unavailable: {e}. In-memory fallback.");
2385            SessionStore::new().with_lifetime(session_lifetime)
2386        }
2387    };
2388    let magic_codes = match crate::magic_code_backend::PostgresMagicCodeBackend::connect(url) {
2389        Ok(b) => pylon_auth::MagicCodeStore::with_backend(Box::new(b)),
2390        Err(e) => {
2391            tracing::warn!("[pylon] PG magic-code backend unavailable: {e}");
2392            pylon_auth::MagicCodeStore::new()
2393        }
2394    };
2395    let oauth_state = match crate::oauth_backend::PostgresOAuthBackend::connect(url) {
2396        Ok(b) => pylon_auth::OAuthStateStore::with_backend(Box::new(b)),
2397        Err(e) => {
2398            tracing::warn!("[pylon] PG OAuth state backend unavailable: {e}");
2399            pylon_auth::OAuthStateStore::new()
2400        }
2401    };
2402    let account_store = match crate::account_backend::PostgresAccountBackend::connect(url) {
2403        Ok(b) => pylon_auth::AccountStore::with_backend(Box::new(b)),
2404        Err(e) => {
2405            tracing::warn!("[pylon] PG account-link backend unavailable: {e}");
2406            pylon_auth::AccountStore::new()
2407        }
2408    };
2409    let api_keys = match crate::api_key_backend::PostgresApiKeyBackend::connect(url) {
2410        Ok(b) => pylon_auth::api_key::ApiKeyStore::with_backend(Box::new(b)),
2411        Err(e) => {
2412            tracing::warn!("[pylon] PG api-key backend unavailable: {e}");
2413            pylon_auth::api_key::ApiKeyStore::new()
2414        }
2415    };
2416    let orgs = match crate::org_backend::PostgresOrgBackend::connect(url) {
2417        Ok(b) => pylon_auth::org::OrgStore::with_backend(Box::new(b)),
2418        Err(e) => {
2419            tracing::warn!("[pylon] PG org backend unavailable: {e}");
2420            pylon_auth::org::OrgStore::new()
2421        }
2422    };
2423    let verification = match crate::verification_backend::PostgresVerificationBackend::connect(url)
2424    {
2425        Ok(b) => pylon_auth::verification::VerificationStore::with_backend(Box::new(b)),
2426        Err(e) => {
2427            tracing::warn!("[pylon] PG verification backend unavailable: {e}");
2428            pylon_auth::verification::VerificationStore::new()
2429        }
2430    };
2431    let audit = match crate::audit_backend::PostgresAuditBackend::connect(url) {
2432        Ok(b) => pylon_auth::audit::AuditStore::with_backend(Box::new(b)),
2433        Err(e) => {
2434            tracing::warn!("[pylon] PG audit backend unavailable: {e}");
2435            pylon_auth::audit::AuditStore::new()
2436        }
2437    };
2438    AuthStores {
2439        session_store: Arc::new(session_store),
2440        magic_codes: Arc::new(magic_codes),
2441        oauth_state: Arc::new(oauth_state),
2442        account_store: Arc::new(account_store),
2443        api_keys: Arc::new(api_keys),
2444        orgs: Arc::new(orgs),
2445        siwe: Arc::new(pylon_auth::siwe::NonceStore::new()),
2446        phone_codes: Arc::new(pylon_auth::phone::PhoneCodeStore::new()),
2447        passkeys: Arc::new(pylon_auth::webauthn::PasskeyStore::new()),
2448        verification: Arc::new(verification),
2449        audit: Arc::new(audit),
2450    }
2451}
2452
2453/// Build the session store. Persists by default for file-backed runtimes —
2454/// sessions live in a sibling `<db>.sessions.db` file next to the app DB
2455/// unless `PYLON_SESSION_DB` overrides the path or
2456/// `PYLON_SESSION_IN_MEMORY=1` opts out. In-memory runtimes (tests)
2457/// get an in-memory session store.
2458///
2459/// This used to be opt-in, which silently broke every app after a server
2460/// restart: tokens in browser localStorage resolved to anonymous, pulls
2461/// came back empty under policy, and mutations 400'd with UNAUTHENTICATED.
2462#[allow(dead_code)]
2463fn build_session_store(app_db_path: Option<&str>) -> SessionStore {
2464    if std::env::var("PYLON_SESSION_IN_MEMORY")
2465        .map(|v| v == "1" || v == "true")
2466        .unwrap_or(false)
2467    {
2468        return SessionStore::new();
2469    }
2470    let explicit = std::env::var("PYLON_SESSION_DB").ok();
2471    let default_path = app_db_path.map(|p| format!("{p}.sessions.db"));
2472    let path = match explicit.or(default_path) {
2473        Some(p) => p,
2474        None => return SessionStore::new(),
2475    };
2476    match crate::session_backend::SqliteSessionBackend::open(&path) {
2477        Ok(backend) => {
2478            tracing::info!("[pylon] Session persistence enabled: {path}");
2479            SessionStore::with_backend(Box::new(backend))
2480        }
2481        Err(e) => {
2482            tracing::warn!(
2483                "[pylon] could not open session DB {path}: {e}. Falling back to in-memory sessions."
2484            );
2485            SessionStore::new()
2486        }
2487    }
2488}
2489
2490/// Parse a `multipart/form-data` body and return the first file part found.
2491///
2492/// Returns `(filename, content_type, bytes)` on success, `None` if the body
2493/// can't be parsed or no file part exists.
2494///
2495/// Handles the common RFC 7578 subset used by browsers and curl:
2496/// - `--boundary` part separators
2497/// - `Content-Disposition: form-data; name=...; filename=...`
2498/// - `Content-Type: ...`
2499/// - blank line, then raw bytes, then `\r\n--boundary` terminator
2500fn parse_multipart_first_file(
2501    body: &[u8],
2502    content_type_header: &str,
2503) -> Option<(String, String, Vec<u8>)> {
2504    // Extract the boundary parameter.
2505    let boundary_param = content_type_header
2506        .split(';')
2507        .find_map(|p| p.trim().strip_prefix("boundary="))?;
2508    let boundary = boundary_param.trim_matches('"');
2509    let delimiter = format!("--{boundary}");
2510    let delimiter_bytes = delimiter.as_bytes();
2511
2512    // Find each part between delimiters.
2513    let mut pos = 0usize;
2514    while pos < body.len() {
2515        // Find the next delimiter.
2516        let next = find_subslice(&body[pos..], delimiter_bytes)?;
2517        let part_start = pos + next + delimiter_bytes.len();
2518        // Skip CRLF or -- (terminator) after the delimiter.
2519        if part_start + 2 > body.len() {
2520            return None;
2521        }
2522        if &body[part_start..part_start + 2] == b"--" {
2523            return None; // end of parts, no file found
2524        }
2525        let header_start = part_start + skip_crlf(&body[part_start..]);
2526
2527        // Find end-of-headers (blank line).
2528        let header_end_offset = find_subslice(&body[header_start..], b"\r\n\r\n")?;
2529        let headers = &body[header_start..header_start + header_end_offset];
2530        let data_start = header_start + header_end_offset + 4;
2531
2532        // Find the next delimiter — that's where this part's data ends.
2533        let next_delim_offset = find_subslice(&body[data_start..], delimiter_bytes)?;
2534        // Strip the trailing CRLF before the delimiter.
2535        let mut data_end = data_start + next_delim_offset;
2536        if data_end >= 2 && &body[data_end - 2..data_end] == b"\r\n" {
2537            data_end -= 2;
2538        }
2539
2540        // Parse headers we care about.
2541        let headers_str = std::str::from_utf8(headers).ok()?;
2542        let mut filename: Option<String> = None;
2543        let mut part_ct = String::from("application/octet-stream");
2544        let mut has_file = false;
2545        for line in headers_str.split("\r\n") {
2546            let lower = line.to_ascii_lowercase();
2547            if let Some(rest) = lower.strip_prefix("content-disposition:") {
2548                if rest.contains("filename=") {
2549                    has_file = true;
2550                    // Extract filename="xxx"
2551                    if let Some(start) = line.find("filename=\"") {
2552                        let from = start + 10;
2553                        if let Some(end_offset) = line[from..].find('"') {
2554                            filename = Some(line[from..from + end_offset].to_string());
2555                        }
2556                    }
2557                }
2558            } else if let Some(rest) = lower.strip_prefix("content-type:") {
2559                part_ct = rest.trim().to_string();
2560            }
2561        }
2562
2563        if has_file {
2564            let name = filename.unwrap_or_else(|| "upload".into());
2565            return Some((name, part_ct, body[data_start..data_end].to_vec()));
2566        }
2567
2568        pos = data_end;
2569    }
2570    None
2571}
2572
2573fn find_subslice(haystack: &[u8], needle: &[u8]) -> Option<usize> {
2574    if needle.is_empty() || needle.len() > haystack.len() {
2575        return None;
2576    }
2577    haystack.windows(needle.len()).position(|w| w == needle)
2578}
2579
2580fn skip_crlf(buf: &[u8]) -> usize {
2581    if buf.len() >= 2 && &buf[0..2] == b"\r\n" {
2582        2
2583    } else if !buf.is_empty() && buf[0] == b'\n' {
2584        1
2585    } else {
2586        0
2587    }
2588}
2589
2590#[cfg(test)]
2591mod multipart_tests {
2592    use super::*;
2593
2594    #[test]
2595    fn parses_single_file() {
2596        let body = b"--bnd\r\n\
2597Content-Disposition: form-data; name=\"file\"; filename=\"hello.txt\"\r\n\
2598Content-Type: text/plain\r\n\
2599\r\n\
2600Hello world\r\n\
2601--bnd--\r\n";
2602        let ct = "multipart/form-data; boundary=bnd";
2603        let (name, content_type, bytes) = parse_multipart_first_file(body, ct).unwrap();
2604        assert_eq!(name, "hello.txt");
2605        assert_eq!(content_type, "text/plain");
2606        assert_eq!(bytes, b"Hello world");
2607    }
2608
2609    #[test]
2610    fn returns_none_without_file_part() {
2611        let body = b"--bnd\r\n\
2612Content-Disposition: form-data; name=\"field\"\r\n\
2613\r\n\
2614just text\r\n\
2615--bnd--\r\n";
2616        let ct = "multipart/form-data; boundary=bnd";
2617        assert!(parse_multipart_first_file(body, ct).is_none());
2618    }
2619
2620    #[test]
2621    fn returns_none_when_no_boundary() {
2622        assert!(parse_multipart_first_file(b"anything", "application/json").is_none());
2623    }
2624}