Skip to main content

pylon_runtime/
server.rs

1#[allow(unused_imports)]
2use std::io::Read;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5use std::time::Instant;
6
7use pylon_auth::SessionStore;
8use pylon_http::HttpMethod;
9use pylon_plugin::PluginRegistry;
10use pylon_policy::PolicyEngine;
11use pylon_sync::{ChangeKind, ChangeLog};
12use tiny_http::{Header, Method, Response, Server};
13
14use crate::datastore::{
15    CacheAdapter, EmailAdapter, LocalFileOps, PluginHooksAdapter, PubSubAdapter,
16    RuntimeOpenApiGenerator, ShardOpsAdapter, WsSseNotifier,
17};
18use crate::jobs::{JobQueue, JobResult, Worker};
19use crate::metrics::Metrics;
20use crate::pubsub::PubSubBroker;
21use crate::rate_limit::RateLimiter;
22use crate::rooms::RoomManager;
23use crate::scheduler::Scheduler;
24use crate::sse::SseHub;
25use crate::workflows::WorkflowEngine;
26use crate::ws::WsHub;
27use crate::Runtime;
28use pylon_plugin::builtin::ai_proxy::{AiMessage, AiProxyPlugin};
29use pylon_plugin::builtin::cache::CachePlugin;
30
31// ---------------------------------------------------------------------------
32// Streaming body — bridges mpsc::Receiver to std::io::Read for SSE responses
33// ---------------------------------------------------------------------------
34
35/// A streaming response body backed by an MPSC channel.
36///
37/// When used as the body of a `tiny_http::Response`, it causes the server to
38/// write data as it arrives through the channel. Dropping the sender closes
39/// the stream (EOF).
40struct StreamingBody {
41    rx: std::sync::mpsc::Receiver<Vec<u8>>,
42    buf: Vec<u8>,
43    pos: usize,
44}
45
46impl StreamingBody {
47    fn new(rx: std::sync::mpsc::Receiver<Vec<u8>>) -> Self {
48        Self {
49            rx,
50            buf: Vec::new(),
51            pos: 0,
52        }
53    }
54}
55
56impl std::io::Read for StreamingBody {
57    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
58        // Drain any leftover data from a previous recv that was larger than
59        // the caller's buffer.
60        if self.pos < self.buf.len() {
61            let remaining = &self.buf[self.pos..];
62            let n = remaining.len().min(buf.len());
63            buf[..n].copy_from_slice(&remaining[..n]);
64            self.pos += n;
65            if self.pos >= self.buf.len() {
66                self.buf.clear();
67                self.pos = 0;
68            }
69            return Ok(n);
70        }
71
72        // Block until the next chunk arrives or the sender is dropped.
73        match self.rx.recv() {
74            Ok(data) if data.is_empty() => Ok(0),
75            Ok(data) => {
76                let n = data.len().min(buf.len());
77                buf[..n].copy_from_slice(&data[..n]);
78                if n < data.len() {
79                    self.buf = data;
80                    self.pos = n;
81                }
82                Ok(n)
83            }
84            Err(_) => Ok(0), // Channel closed = EOF
85        }
86    }
87}
88
89/// Global shutdown flag. Set via `request_shutdown()` to trigger graceful exit.
90static SHUTDOWN: AtomicBool = AtomicBool::new(false);
91
92/// Request a graceful shutdown of the running server.
93///
94/// This sets the shutdown flag and, if a server handle has been stashed, calls
95/// `unblock()` to wake the request loop. Safe to call from any thread or signal
96/// handler.
97pub fn request_shutdown() {
98    SHUTDOWN.store(true, Ordering::SeqCst);
99    // If a server handle is available, unblock the request loop so it can
100    // observe the flag immediately rather than waiting for the next request.
101    if let Some(srv) = SERVER_HANDLE.get() {
102        srv.unblock();
103    }
104}
105
106/// Global handle to the `tiny_http::Server` so `request_shutdown()` can call
107/// `unblock()` without requiring callers to hold a reference.
108static SERVER_HANDLE: std::sync::OnceLock<Arc<Server>> = std::sync::OnceLock::new();
109
110// ---------------------------------------------------------------------------
111// Security headers
112// ---------------------------------------------------------------------------
113
114/// Common security headers applied to every response.
115fn security_headers() -> Vec<Header> {
116    vec![
117        Header::from_bytes("X-Content-Type-Options", "nosniff").unwrap(),
118        Header::from_bytes("X-Frame-Options", "DENY").unwrap(),
119        Header::from_bytes("X-XSS-Protection", "1; mode=block").unwrap(),
120    ]
121}
122
123/// Add security headers to a response.
124fn with_security_headers<R: std::io::Read>(response: Response<R>) -> Response<R> {
125    let mut resp = response;
126    for header in security_headers() {
127        resp = resp.with_header(header);
128    }
129    resp
130}
131
132/// Start the dev server on the given port. Blocks until shutdown.
133pub fn start(runtime: Arc<Runtime>, port: u16) -> Result<(), String> {
134    start_with_plugins(runtime, port, None)
135}
136
137/// Start the dev server with optional plugins. Blocks until shutdown.
138pub fn start_with_plugins(
139    runtime: Arc<Runtime>,
140    port: u16,
141    plugins: Option<Arc<PluginRegistry>>,
142) -> Result<(), String> {
143    start_server(runtime, port, plugins, None)
144}
145
146/// Start the dev server with plugins and a shard registry for real-time
147/// simulations (games, MMO zones, etc.). Blocks until shutdown.
148pub fn start_with_shards(
149    runtime: Arc<Runtime>,
150    port: u16,
151    plugins: Option<Arc<PluginRegistry>>,
152    shard_registry: Arc<dyn pylon_realtime::DynShardRegistry>,
153) -> Result<(), String> {
154    start_server(runtime, port, plugins, Some(shard_registry))
155}
156
157fn start_server(
158    runtime: Arc<Runtime>,
159    port: u16,
160    plugins: Option<Arc<PluginRegistry>>,
161    shard_registry: Option<Arc<dyn pylon_realtime::DynShardRegistry>>,
162) -> Result<(), String> {
163    // Run the tracing-exporter hook BEFORE anything else emits spans. The
164    // operator registers it via `pylon_observability::set_tracing_hook`
165    // at process init; here we invoke it exactly once on startup. No-op
166    // if nothing was registered.
167    pylon_observability::run_tracing_hook();
168
169    let addr = format!("0.0.0.0:{port}");
170    let server = Server::http(&addr).map_err(|e| format!("Failed to start server: {e}"))?;
171    let server = Arc::new(server);
172
173    // Stash a handle so `request_shutdown()` can unblock the loop.
174    let _ = SERVER_HANDLE.set(Arc::clone(&server));
175
176    let session_store = Arc::new(build_session_store(runtime.db_path().as_deref()));
177    let magic_codes = Arc::new(pylon_auth::MagicCodeStore::new());
178    // Persist OAuth state if a session DB is configured. Reuse the same path
179    // (sessions and OAuth state are both small auth artifacts that should
180    // outlive a restart). Falls back to in-memory if no DB path is set.
181    let oauth_state = Arc::new(match std::env::var("PYLON_SESSION_DB").ok() {
182        Some(path) => match crate::oauth_backend::SqliteOAuthBackend::open(&path) {
183            Ok(backend) => pylon_auth::OAuthStateStore::with_backend(Box::new(backend)),
184            Err(e) => {
185                tracing::warn!(
186                    "[auth] OAuth state SQLite backend unavailable: {e} — falling back to in-memory"
187                );
188                pylon_auth::OAuthStateStore::new()
189            }
190        },
191        None => pylon_auth::OAuthStateStore::new(),
192    });
193    let policy_engine = Arc::new(PolicyEngine::from_manifest(runtime.manifest()));
194    let change_log = Arc::new(ChangeLog::new());
195
196    // Seed the change log with one synthetic insert per extant row so that
197    // a pull from seq=0 after a restart reconstructs current state. The
198    // change log is in-memory — restarting the process without this would
199    // leave SQLite rows unreachable via /api/sync/pull (clients would
200    // pull nothing and see an empty replica). Seqs here are fresh; clients
201    // whose cursors are ahead of `self.seq` get a 410 and full resync,
202    // which then hits this seeded log and gets every current row back.
203    for entity in runtime.manifest().entities.iter() {
204        match runtime.list(&entity.name) {
205            Ok(rows) => {
206                for row in rows {
207                    if let Some(id) = row.get("id").and_then(|v| v.as_str()) {
208                        change_log.append(&entity.name, id, ChangeKind::Insert, Some(row.clone()));
209                    }
210                }
211            }
212            Err(_) => {
213                // Entity table may not exist yet on first boot — skip.
214            }
215        }
216    }
217    let ws_hub = WsHub::new();
218    let sse_hub = SseHub::new();
219    // Default-register the rate-limit plugin when no custom registry was
220    // supplied. Without this, self-hosted deployments would launch with
221    // auth endpoints (/api/auth/magic/send, /api/auth/magic/verify,
222    // /api/auth/session) wide open to brute force and enumeration.
223    //
224    // Dev: 100k/min so a React app's initial bundle + auth + sync pulls
225    // (each worth ~6-10 requests) doesn't immediately 429 the dev. Prod:
226    // 100/min per IP — tight enough to crush burst attackers, loose
227    // enough for legitimate multi-tab UIs. Callers passing their own
228    // registry are responsible for their own limits.
229    // Probe dev mode NOW — defined for real at line ~300 but plugin
230    // registration below needs it. Same env-var, same logic.
231    let is_dev_early = std::env::var("PYLON_DEV_MODE")
232        .map(|v| v == "1" || v == "true")
233        .unwrap_or(true);
234    let plugin_rl_max: u32 = if is_dev_early { 100_000 } else { 100 };
235    let plugin_reg: Arc<PluginRegistry> = plugins.unwrap_or_else(|| {
236        let mut reg = PluginRegistry::new(runtime.manifest().clone());
237        reg.register(Arc::new(
238            pylon_plugin::builtin::rate_limit::RateLimitPlugin::new(
239                plugin_rl_max,
240                std::time::Duration::from_secs(60),
241            ),
242        ));
243        // Auto-scope any entity that declares a `tenantId` field. This is
244        // how multi-tenant isolation becomes a default posture rather than
245        // an opt-in: drop the field on the entity and the plugin takes it
246        // from there (stamps inserts, rejects cross-tenant writes).
247        reg.register(Arc::new(
248            pylon_plugin::builtin::tenant_scope::TenantScopePlugin::from_manifest(
249                runtime.manifest(),
250            ),
251        ));
252        Arc::new(reg)
253    });
254    let room_mgr = Arc::new(RoomManager::new(120)); // 2 min idle timeout
255    let ws_port = port + 1;
256    let sse_port = port + 2;
257
258    // Record server start time for the health endpoint.
259    let start_time = Instant::now();
260
261    let metrics = Arc::new(Metrics::new());
262
263    // Cache and pub/sub shared instances.
264    let cache = Arc::new(CachePlugin::new(100_000));
265    let pubsub_broker = Arc::new(PubSubBroker::new(100));
266
267    // Job queue, scheduler, and background workers.
268    let job_queue = Arc::new(JobQueue::new(1000));
269
270    // Persistent job store. Colocate with the app DB so `./app.db` gets
271    // `./app.db.jobs.db` automatically — otherwise jobs land in CWD, which
272    // is wherever the server was launched from (confusing and fragile).
273    // In-memory runtimes and the `PYLON_JOBS_IN_MEMORY=1` opt-out both
274    // skip persistence.
275    let jobs_in_memory = std::env::var("PYLON_JOBS_IN_MEMORY")
276        .map(|v| v == "1" || v == "true")
277        .unwrap_or(false);
278    if !jobs_in_memory {
279        let jobs_db_path = std::env::var("PYLON_JOBS_DB").ok().unwrap_or_else(|| {
280            runtime
281                .db_path()
282                .map(|p| format!("{p}.jobs.db"))
283                .unwrap_or_else(|| "pylon.jobs.db".into())
284        });
285        match crate::job_store::JobStore::open(&jobs_db_path) {
286            Ok(store) => {
287                let store = Arc::new(store);
288                let restored = job_queue.restore_from(&store);
289                if restored > 0 {
290                    tracing::info!("[jobs] Restored {restored} pending job(s) from {jobs_db_path}");
291                }
292                job_queue.attach_store(store);
293            }
294            Err(e) => {
295                tracing::warn!(
296                    "[jobs] Could not open job store at {jobs_db_path}: {e} — running without persistence"
297                );
298            }
299        }
300    }
301
302    // Register built-in framework jobs.
303    {
304        let cache_ref = Arc::clone(&cache);
305        job_queue.register(
306            "pylon.cache.cleanup",
307            Arc::new(move |_job| {
308                cache_ref.cleanup_expired();
309                JobResult::Success
310            }),
311        );
312        let rooms_ref = Arc::clone(&room_mgr);
313        job_queue.register(
314            "pylon.rooms.cleanup",
315            Arc::new(move |_job| {
316                rooms_ref.cleanup_idle();
317                JobResult::Success
318            }),
319        );
320    }
321
322    let scheduler = Arc::new(Scheduler::new(Arc::clone(&job_queue)));
323    // Schedule built-in tasks.
324    let _ = scheduler.schedule(
325        "pylon.cache.cleanup",
326        "*/10 * * * *",
327        Arc::new(|_| JobResult::Success),
328    );
329    let _ = scheduler.schedule(
330        "pylon.rooms.cleanup",
331        "*/5 * * * *",
332        Arc::new(|_| JobResult::Success),
333    );
334
335    // Start 2 background workers.
336    let _worker_handles: Vec<_> = (0..2)
337        .map(|i| {
338            let w = Worker::new(Arc::clone(&job_queue), &format!("worker-{i}"));
339            w.start()
340        })
341        .collect();
342
343    // Start the scheduler.
344    let _scheduler_handle = Arc::clone(&scheduler).start();
345
346    // Workflow engine: TS runner URL configurable via env, defaults to local Bun server.
347    let wf_runner_url = std::env::var("PYLON_WORKFLOW_RUNNER_URL")
348        .unwrap_or_else(|_| "http://127.0.0.1:9876/run".to_string());
349    let workflow_engine = Arc::new(WorkflowEngine::new(&wf_runner_url, 10_000));
350
351    // Rate limiter: per-IP outer cap on total requests.
352    //
353    // Defaults:
354    //   - Dev mode: effectively off (100k/min) so a React app's initial
355    //     bundle load + sync pulls + user clicks don't immediately 429.
356    //     100/min blew through during a single login + first sync pull.
357    //   - Prod: 600/min (10 req/sec average). Still tight, but a real app
358    //     should override with PYLON_RATE_LIMIT_MAX anyway.
359    //
360    // Override with PYLON_RATE_LIMIT_MAX + PYLON_RATE_LIMIT_WINDOW.
361    let default_rl_max = if is_dev_early { 100_000 } else { 600 };
362    let rl_max: u32 = std::env::var("PYLON_RATE_LIMIT_MAX")
363        .ok()
364        .and_then(|v| v.parse().ok())
365        .unwrap_or(default_rl_max);
366    let rl_window: u64 = std::env::var("PYLON_RATE_LIMIT_WINDOW")
367        .ok()
368        .and_then(|v| v.parse().ok())
369        .unwrap_or(60);
370    let rate_limiter = Arc::new(RateLimiter::new(rl_max, rl_window));
371
372    // Per-function rate limiter: separate bucket per (caller, function) pair.
373    // Defaults to a stricter cap because functions are heavier than reads.
374    // Override via PYLON_FN_RATE_LIMIT_MAX / PYLON_FN_RATE_LIMIT_WINDOW.
375    let fn_rl_max: u32 = std::env::var("PYLON_FN_RATE_LIMIT_MAX")
376        .ok()
377        .and_then(|v| v.parse().ok())
378        .unwrap_or(30);
379    let fn_rl_window: u64 = std::env::var("PYLON_FN_RATE_LIMIT_WINDOW")
380        .ok()
381        .and_then(|v| v.parse().ok())
382        .unwrap_or(60);
383    let fn_rate_limiter = Arc::new(RateLimiter::new(fn_rl_max, fn_rl_window));
384
385    // TypeScript function runtime: optional Bun process that loads functions/*.ts
386    // If no `functions/` directory exists or Bun isn't installed, this is None
387    // and /api/fn/* routes return 503.
388    // Build a notifier adapter so function mutations (`ctx.db.insert/update/delete`)
389    // emit change events to WS + SSE subscribers on COMMIT. Without this,
390    // functions write to the DB but sync clients never see the update
391    // live — they only catch up on the next refetch.
392    let fn_notifier: Arc<dyn pylon_router::ChangeNotifier> =
393        Arc::new(crate::datastore::WsSseNotifier {
394            ws: Arc::clone(&ws_hub),
395            sse: Arc::clone(&sse_hub),
396        });
397    let fn_ops_maybe = crate::datastore::try_spawn_functions(
398        Arc::clone(&runtime),
399        Arc::clone(&job_queue),
400        Arc::clone(&fn_rate_limiter),
401        Arc::clone(&change_log),
402        fn_notifier,
403    );
404
405    // Dev mode flag: when false, sensitive data (e.g. magic codes) is omitted from responses.
406    let is_dev = std::env::var("PYLON_DEV_MODE")
407        .map(|v| v == "1" || v == "true")
408        .unwrap_or(true);
409
410    // CORS origin. Defaults to `*` in dev for convenience; in prod we refuse
411    // to start with a wildcard because the server sends `Access-Control-
412    // Allow-Credentials: true` elsewhere and also accepts `Authorization:
413    // Bearer <session>`. The combination of `*` + credentials is a spec
414    // violation that some browsers tolerate, and even when they don't it
415    // lets any origin drive bearer-auth APIs.
416    let cors_origin = match std::env::var("PYLON_CORS_ORIGIN") {
417        Ok(v) => v,
418        Err(_) if is_dev => "*".to_string(),
419        Err(_) => {
420            return Err(
421                "PYLON_CORS_ORIGIN must be set in production (non-dev mode). \
422                Set it to your frontend's origin, or set PYLON_DEV_MODE=true \
423                for local development."
424                    .into(),
425            );
426        }
427    };
428    if !is_dev && cors_origin == "*" {
429        return Err("PYLON_CORS_ORIGIN=\"*\" is refused in production mode. \
430            Set it to an explicit origin (https://app.example.com)."
431            .into());
432    }
433    // Browsers forbid combining `Access-Control-Allow-Origin: *` with
434    // `Access-Control-Allow-Credentials: true`. Cookie-based auth needs
435    // credentials, so we only emit the credentials header when the origin
436    // is specific. In dev with `*` we lose cookies-from-cross-origin
437    // (acceptable: dev typically uses same-origin proxying), but we
438    // refuse to send a header combo browsers will reject either way.
439    let allow_credentials = cors_origin != "*";
440    // Validate the origin once so per-request header construction can never
441    // panic on bad bytes. Previously every `Header::from_bytes(...).unwrap()`
442    // was a potential request-triggered DoS via env misconfiguration.
443    if Header::from_bytes(
444        "Access-Control-Allow-Origin",
445        cors_origin.as_bytes().to_vec(),
446    )
447    .is_err()
448    {
449        return Err(format!(
450            "PYLON_CORS_ORIGIN={cors_origin:?} contains bytes that are not a valid HTTP header value"
451        ));
452    }
453
454    // Admin token: read once at startup, not per-request.
455    let admin_token: Option<String> = std::env::var("PYLON_ADMIN_TOKEN").ok();
456
457    // Session cookie config — built once. Cookie name defaults to
458    // `${app_name}_session` so multiple Pylon apps on the same parent
459    // domain don't clobber each other's cookies. Browsers receive an
460    // HttpOnly+Secure+SameSite=Lax cookie by default; the same opaque
461    // session token continues to work via `Authorization: Bearer …`
462    // for CLI / mobile / server-to-server callers.
463    let cookie_config = Arc::new({
464        let app_name = runtime.manifest().name.as_str();
465        pylon_auth::CookieConfig::from_env(&pylon_auth::CookieConfig::default_name_for(app_name))
466    });
467
468    // CSRF protection. Enforced inline at the HTTP layer because the plugin
469    // trait's `on_request` hook doesn't see request headers. For
470    // state-changing methods (POST/PATCH/PUT/DELETE) we check Origin, then
471    // Referer, against the allowlist.
472    //
473    // Allowlist resolution:
474    //   - PYLON_CSRF_ORIGINS (comma-separated) if set
475    //   - otherwise PYLON_CORS_ORIGIN (already validated above)
476    //   - in dev, fall back to allow-any to avoid breaking local tooling.
477    let csrf_origins: Vec<String> = match std::env::var("PYLON_CSRF_ORIGINS") {
478        Ok(v) => v
479            .split(',')
480            .map(|s| s.trim().to_string())
481            .filter(|s| !s.is_empty())
482            .collect(),
483        Err(_) => {
484            if is_dev {
485                vec!["*".to_string()]
486            } else if cors_origin != "*" {
487                vec![cors_origin.clone()]
488            } else {
489                // Non-dev + wildcard was already rejected, but guard anyway.
490                vec![]
491            }
492        }
493    };
494    let csrf = Arc::new(pylon_plugin::builtin::csrf::CsrfPlugin::new(csrf_origins));
495
496    // Start WebSocket server on port+1.
497    //
498    // The snapshot fetcher gives the WS reader a way to ship the current
499    // CRDT snapshot to a client the instant it subscribes — without it
500    // the new tab would have to wait for the next write before catching
501    // up to the converged state. Encodes into the same length-prefixed
502    // wire frame as the broadcast path so the client decoder is shared.
503    //
504    // Authz is enforced HERE, not at the WS layer: the closure runs the
505    // row through `check_entity_read` against the caller's auth ctx and
506    // returns None on deny. The caller (handle_crdt_control) treats
507    // None as "don't subscribe" — so a denied client can't sit on a
508    // subscription waiting for a future write to leak state.
509    {
510        let hub = Arc::clone(&ws_hub);
511        let sessions = Arc::clone(&session_store);
512        let runtime_for_fetcher = Arc::clone(&runtime);
513        let pe_for_fetcher = Arc::clone(&policy_engine);
514        let fetcher: crate::ws::SnapshotFetcher = Arc::new(move |auth_ctx, entity, row_id| {
515            use pylon_http::DataStore;
516            // Fetch the row first so the policy engine can evaluate
517            // row-level predicates (`data.authorId == auth.userId`
518            // etc). Missing row → deny silently; the client just
519            // never gets a frame and can't probe existence.
520            let row = match runtime_for_fetcher.get_by_id(entity, row_id) {
521                Ok(Some(v)) => v,
522                _ => return None,
523            };
524            if !matches!(
525                pe_for_fetcher.check_entity_read(entity, auth_ctx, Some(&row)),
526                pylon_policy::PolicyResult::Allowed
527            ) {
528                return None;
529            }
530            let snap = match runtime_for_fetcher.crdt_snapshot(entity, row_id) {
531                Ok(Some(bytes)) => bytes,
532                _ => return None,
533            };
534            pylon_router::encode_crdt_frame(
535                pylon_router::CRDT_FRAME_SNAPSHOT,
536                entity,
537                row_id,
538                &snap,
539            )
540            .ok()
541        });
542        std::thread::spawn(move || {
543            crate::ws::start_ws_server(hub, sessions, ws_port, Some(fetcher));
544        });
545    }
546
547    // Start SSE server on port+2.
548    {
549        let hub = Arc::clone(&sse_hub);
550        std::thread::spawn(move || {
551            crate::sse::start_sse_server(hub, sse_port);
552        });
553    }
554
555    // Start shard WebSocket server on port+3 when a registry is provided.
556    let shard_ws_port = port + 3;
557    if let Some(reg) = shard_registry.clone() {
558        let sessions = Arc::clone(&session_store);
559        std::thread::spawn(move || {
560            crate::shard_ws::start_shard_ws_server(reg, sessions, shard_ws_port);
561        });
562    }
563
564    tracing::warn!("pylon dev server listening on http://localhost:{port}");
565    tracing::info!("  WebSocket: ws://localhost:{ws_port}");
566    tracing::info!("  Studio: http://localhost:{port}/studio");
567    tracing::info!("  API:    http://localhost:{port}/api/entities/<entity>");
568    tracing::info!("  Auth:   http://localhost:{port}/api/auth/session");
569
570    // Use recv() in a loop instead of incoming_requests() so we can share
571    // the Arc<Server> with the shutdown path (incoming_requests borrows &self
572    // which prevents moving the Arc into another thread).
573    loop {
574        if SHUTDOWN.load(Ordering::Relaxed) {
575            break;
576        }
577
578        let mut request = match server.recv() {
579            Ok(rq) => rq,
580            Err(_) => {
581                // recv() returns Err when unblocked or the socket is closed.
582                break;
583            }
584        };
585
586        if SHUTDOWN.load(Ordering::Relaxed) {
587            break;
588        }
589
590        let rt = Arc::clone(&runtime);
591        let ss = Arc::clone(&session_store);
592        let pe = Arc::clone(&policy_engine);
593        let cl = Arc::clone(&change_log);
594        let wh = Arc::clone(&ws_hub);
595        let sh = Arc::clone(&sse_hub);
596        let mc = Arc::clone(&magic_codes);
597        let pr = Arc::clone(&plugin_reg);
598        let rm = Arc::clone(&room_mgr);
599        let mt = Arc::clone(&metrics);
600        let os = Arc::clone(&oauth_state);
601        let ca = Arc::clone(&cache);
602        let ps = Arc::clone(&pubsub_broker);
603        let jq = Arc::clone(&job_queue);
604        let sc = Arc::clone(&scheduler);
605        let we = Arc::clone(&workflow_engine);
606        let fn_ops_ref = fn_ops_maybe.clone();
607        let shards_ref = shard_registry.clone();
608        let cors_origin = cors_origin.clone();
609        let cookie_config = Arc::clone(&cookie_config);
610        let allow_credentials = allow_credentials;
611        let is_dev = is_dev;
612
613        let method = request.method().clone();
614        let url = request.url().to_string();
615
616        // --- Health check: fast path before auth or body parsing ---
617        if url == "/health" && method == Method::Get {
618            let uptime = start_time.elapsed().as_secs();
619            let body = serde_json::json!({
620                "status": "ok",
621                "version": "0.1.0",
622                "uptime_secs": uptime,
623            })
624            .to_string();
625
626            let response = with_security_headers(
627                Response::from_string(&body)
628                    .with_status_code(200u16)
629                    .with_header(Header::from_bytes("Content-Type", "application/json").unwrap())
630                    .with_header(
631                        Header::from_bytes(
632                            "Access-Control-Allow-Origin",
633                            cors_origin.as_bytes().to_vec(),
634                        )
635                        .unwrap(),
636                    ),
637            );
638            let _ = request.respond(response);
639            continue;
640        }
641
642        // --- Metrics endpoint: fast path before rate-limit / body parsing.
643        // Gate behind admin auth in non-dev to prevent leakage of function
644        // names, request volumes, and error rates to the public internet.
645        // Dev mode stays open so local Prometheus scrapers just work.
646        if url == "/metrics" && method == Method::Get {
647            if !is_dev {
648                let admin_bytes = admin_token.as_deref().unwrap_or("").as_bytes();
649                let auth_ok = !admin_bytes.is_empty()
650                    && request.headers().iter().any(|h| {
651                        let name = h.field.as_str().as_str();
652                        name.eq_ignore_ascii_case("Authorization")
653                            && h.value
654                                .as_str()
655                                .strip_prefix("Bearer ")
656                                .map(|t| pylon_auth::constant_time_eq(t.as_bytes(), admin_bytes))
657                                .unwrap_or(false)
658                    });
659                if !auth_ok {
660                    let body = json_error(
661                        "UNAUTHORIZED",
662                        "/metrics requires admin bearer token in non-dev mode",
663                    );
664                    let response = with_security_headers(
665                        Response::from_string(&body)
666                            .with_status_code(401u16)
667                            .with_header(
668                                Header::from_bytes("Content-Type", "application/json").unwrap(),
669                            ),
670                    );
671                    let _ = request.respond(response);
672                    continue;
673                }
674            }
675            let prefers_prometheus = request.headers().iter().any(|h| {
676                (h.field.as_str() == "Accept" || h.field.as_str() == "accept")
677                    && (h.value.as_str().contains("text/plain")
678                        || h.value.as_str().contains("application/openmetrics-text"))
679            });
680            let (body, content_type) = if prefers_prometheus {
681                (mt.prometheus(), "text/plain; version=0.0.4")
682            } else {
683                (mt.snapshot().to_string(), "application/json")
684            };
685            let response = with_security_headers(
686                Response::from_string(&body)
687                    .with_status_code(200u16)
688                    .with_header(Header::from_bytes("Content-Type", content_type).unwrap())
689                    .with_header(
690                        Header::from_bytes(
691                            "Access-Control-Allow-Origin",
692                            cors_origin.as_bytes().to_vec(),
693                        )
694                        .unwrap(),
695                    ),
696            );
697            let _ = request.respond(response);
698            mt.record_request("GET", 200);
699            continue;
700        }
701
702        // --- Rate limiting: check per-IP request count ---
703        let peer_ip = request
704            .remote_addr()
705            .map(|a| a.ip().to_string())
706            .unwrap_or_default();
707
708        // OPTIONS preflights are browser infrastructure, not user intent.
709        // Rate-limiting them makes a normal page effectively halve its
710        // budget (preflight + real request per call) and returns a 429
711        // that the browser can't interpret as a valid CORS response —
712        // the user-visible symptom is "Failed to fetch" on login. Skip.
713        let is_preflight = matches!(method, Method::Options);
714        if !is_preflight {
715            if let Err(retry_after) = rate_limiter.check(&peer_ip) {
716                let err_body = json_error(
717                    "RATE_LIMITED",
718                    &format!("Too many requests. Retry after {retry_after} seconds."),
719                );
720                let response = with_security_headers(
721                    Response::from_string(&err_body)
722                        .with_status_code(429u16)
723                        .with_header(
724                            Header::from_bytes("Content-Type", "application/json").unwrap(),
725                        )
726                        .with_header(
727                            Header::from_bytes(
728                                "Access-Control-Allow-Origin",
729                                cors_origin.as_bytes().to_vec(),
730                            )
731                            .unwrap(),
732                        )
733                        .with_header(
734                            Header::from_bytes(
735                                "Access-Control-Allow-Methods",
736                                "GET, POST, PATCH, DELETE, OPTIONS",
737                            )
738                            .unwrap(),
739                        )
740                        .with_header(
741                            Header::from_bytes(
742                                "Access-Control-Allow-Headers",
743                                "Content-Type, Authorization",
744                            )
745                            .unwrap(),
746                        )
747                        .with_header(
748                            Header::from_bytes(
749                                "Retry-After",
750                                retry_after.to_string().as_bytes().to_vec(),
751                            )
752                            .unwrap(),
753                        ),
754                );
755                let _ = request.respond(response);
756                mt.record_request(method.as_str(), 429);
757                continue;
758            }
759        } // end: if !is_preflight
760
761        // --- CSRF check on state-changing requests ---
762        //
763        // Browsers forbid cross-origin POST/PATCH/PUT/DELETE unless CORS
764        // allows it, but an attacker controlling another origin can still
765        // ship credentials-bearing requests if the server is permissive.
766        // The CSRF plugin validates Origin (then Referer) against an explicit
767        // allowlist — this is the check that was missing because the Plugin
768        // trait's `on_request` hook has no access to headers.
769        //
770        // The Authorization header carries bearer tokens, so CSRF mostly
771        // matters for cookie-based sessions — but we enforce globally: a
772        // request that misses Origin/Referer on a state-changing method is
773        // rejected, which is the safer default.
774        {
775            let method_str = method.as_str();
776            let is_bearer = request.headers().iter().any(|h| {
777                (h.field.as_str() == "Authorization" || h.field.as_str() == "authorization")
778                    && h.value.as_str().starts_with("Bearer ")
779            });
780            // Bearer-authenticated requests are not CSRF-vulnerable in the
781            // classic sense — browsers don't auto-attach bearer tokens. Skip
782            // the check for them so server-to-server API callers keep working
783            // without needing Origin headers.
784            if !is_bearer && !matches!(method, Method::Get | Method::Head | Method::Options) {
785                let origin = request
786                    .headers()
787                    .iter()
788                    .find(|h| h.field.as_str() == "Origin" || h.field.as_str() == "origin")
789                    .map(|h| h.value.as_str().to_string());
790                let referer = request
791                    .headers()
792                    .iter()
793                    .find(|h| h.field.as_str() == "Referer" || h.field.as_str() == "referer")
794                    .map(|h| h.value.as_str().to_string());
795                if let Err(err) = csrf.check(method_str, origin.as_deref(), referer.as_deref()) {
796                    let body = json_error(&err.code, &err.message);
797                    let response = with_security_headers(
798                        Response::from_string(&body)
799                            .with_status_code(err.status)
800                            .with_header(
801                                Header::from_bytes("Content-Type", "application/json").unwrap(),
802                            )
803                            .with_header(
804                                Header::from_bytes(
805                                    "Access-Control-Allow-Origin",
806                                    cors_origin.as_bytes().to_vec(),
807                                )
808                                .unwrap(),
809                            ),
810                    );
811                    let _ = request.respond(response);
812                    mt.record_request(method_str, err.status);
813                    continue;
814                }
815            }
816        }
817
818        // Extract auth token + auth context EARLY so every fast path (upload,
819        // shard SSE, fn streaming, AI streaming) can enforce auth the same
820        // way the router does. Previously these paths ran before auth
821        // extraction and bypassed the plugin/router auth chain entirely.
822        //
823        // Two transports for the same opaque session token:
824        //   1. `Authorization: Bearer <token>` — CLI, mobile, server-to-server
825        //   2. `Cookie: <name>=<token>` — browsers (HttpOnly, XSS can't read)
826        // Bearer wins when both are present (explicit beats ambient).
827        let bearer_token: Option<String> = request
828            .headers()
829            .iter()
830            .find(|h| h.field.as_str() == "Authorization" || h.field.as_str() == "authorization")
831            .and_then(|h| {
832                let val = h.value.as_str();
833                val.strip_prefix("Bearer ").map(|t| t.to_string())
834            });
835        let cookie_token: Option<String> = if bearer_token.is_some() {
836            None
837        } else {
838            request
839                .headers()
840                .iter()
841                .find(|h| h.field.as_str() == "Cookie" || h.field.as_str() == "cookie")
842                .and_then(|h| {
843                    pylon_auth::extract_session_cookie(h.value.as_str(), &cookie_config.name)
844                })
845        };
846        let auth_token: Option<String> = bearer_token.or(cookie_token);
847        let auth_ctx = if admin_token.is_some()
848            && auth_token.is_some()
849            && pylon_auth::constant_time_eq(
850                auth_token.as_deref().unwrap_or("").as_bytes(),
851                admin_token.as_deref().unwrap_or("").as_bytes(),
852            ) {
853            pylon_auth::AuthContext::admin()
854        } else {
855            ss.resolve(auth_token.as_deref())
856        };
857
858        // --- Test-reset endpoint — in-memory + dev mode + localhost only ---
859        //
860        // `pylon test` sets PYLON_IN_MEMORY=1 + PYLON_DEV_MODE=true.
861        // The TS helper `resetDb()` posts here between `test(...)` blocks
862        // to isolate cases. Gates:
863        //   1. dev mode (production refuses outright)
864        //   2. in-memory DB (belt-and-braces against accidental file wipes)
865        //   3. peer IP is loopback (a dev laptop often has localhost:4321
866        //      reachable; without this, a browser visiting a malicious
867        //      site could cross-site-POST a reset via a bare form —
868        //      blind CSRF that doesn't care about the response)
869        //
870        // Positioned AFTER the rate limiter and CSRF check on purpose so
871        // those middlewares apply — the earlier placement skipped both.
872        if url == "/api/__test__/reset" && method == Method::Post {
873            let is_loopback = peer_ip == "127.0.0.1"
874                || peer_ip == "::1"
875                || peer_ip.starts_with("127.")
876                || peer_ip == "localhost";
877            if !is_dev || !rt.is_in_memory() || !is_loopback {
878                let body = json_error(
879                    "RESET_REFUSED",
880                    "reset endpoint is only available in dev mode + in-memory DB + from loopback",
881                );
882                let response = with_security_headers(
883                    Response::from_string(&body)
884                        .with_status_code(403u16)
885                        .with_header(
886                            Header::from_bytes("Content-Type", "application/json").unwrap(),
887                        )
888                        .with_header(
889                            Header::from_bytes(
890                                "Access-Control-Allow-Origin",
891                                cors_origin.as_bytes().to_vec(),
892                            )
893                            .unwrap(),
894                        ),
895                );
896                let _ = request.respond(response);
897                mt.record_request("POST", 403);
898                continue;
899            }
900            let (status, body) = match rt.reset_for_tests() {
901                Ok(()) => (200u16, "{\"reset\":true}".to_string()),
902                Err(e) => (500u16, json_error(&e.code, &e.message)),
903            };
904            let response = with_security_headers(
905                Response::from_string(&body)
906                    .with_status_code(status)
907                    .with_header(Header::from_bytes("Content-Type", "application/json").unwrap())
908                    .with_header(
909                        Header::from_bytes(
910                            "Access-Control-Allow-Origin",
911                            cors_origin.as_bytes().to_vec(),
912                        )
913                        .unwrap(),
914                    ),
915            );
916            let _ = request.respond(response);
917            mt.record_request("POST", status);
918            continue;
919        }
920
921        // --- File upload fast path: handle binary body before string conversion ---
922        // Uploads come in two shapes:
923        //   1. Direct binary body with X-Filename / Content-Type headers
924        //   2. multipart/form-data with a file part
925        //
926        // Require an authenticated user. Uploads write to the files backend
927        // (and into the plugin audit log for soft-delete etc.), so
928        // unauthenticated callers cannot use this route.
929        if url == "/api/files/upload" && method == Method::Post {
930            const UPLOAD_MAX: usize = 10 * 1024 * 1024;
931            // Enforce size BEFORE reading the body so a 10 GiB stream can't
932            // buffer into memory. Content-Length pre-check, then bounded read.
933            if let Some(declared) = request.body_length() {
934                if declared > UPLOAD_MAX {
935                    let err = json_error(
936                        "PAYLOAD_TOO_LARGE",
937                        &format!("Content-Length {declared} exceeds upload max of {UPLOAD_MAX}"),
938                    );
939                    let response = with_security_headers(
940                        Response::from_string(&err)
941                            .with_status_code(413u16)
942                            .with_header(
943                                Header::from_bytes("Content-Type", "application/json").unwrap(),
944                            )
945                            .with_header(
946                                Header::from_bytes(
947                                    "Access-Control-Allow-Origin",
948                                    cors_origin.as_bytes().to_vec(),
949                                )
950                                .unwrap(),
951                            ),
952                    );
953                    let _ = request.respond(response);
954                    mt.record_request("POST", 413);
955                    continue;
956                }
957            }
958            if auth_ctx.user_id.is_none() {
959                let err = json_error(
960                    "AUTH_REQUIRED",
961                    "/api/files/upload requires an authenticated session",
962                );
963                let response = with_security_headers(
964                    Response::from_string(&err)
965                        .with_status_code(401u16)
966                        .with_header(
967                            Header::from_bytes("Content-Type", "application/json").unwrap(),
968                        )
969                        .with_header(
970                            Header::from_bytes(
971                                "Access-Control-Allow-Origin",
972                                cors_origin.as_bytes().to_vec(),
973                            )
974                            .unwrap(),
975                        ),
976                );
977                let _ = request.respond(response);
978                mt.record_request("POST", 401);
979                continue;
980            }
981            // Read up to UPLOAD_MAX + 1 bytes. If we read the full +1 we know
982            // the client lied about Content-Length (or used chunked encoding
983            // and overran). Reject in that case instead of continuing with a
984            // truncated file.
985            use std::io::Read;
986            let mut bytes: Vec<u8> = Vec::with_capacity(8192);
987            let mut limited = request.as_reader().take((UPLOAD_MAX as u64) + 1);
988            let _ = limited.read_to_end(&mut bytes);
989
990            const MAX: usize = UPLOAD_MAX;
991            if bytes.len() > MAX {
992                let err = json_error("PAYLOAD_TOO_LARGE", "File exceeds 10 MB limit");
993                let response = with_security_headers(
994                    Response::from_string(&err)
995                        .with_status_code(413u16)
996                        .with_header(
997                            Header::from_bytes("Content-Type", "application/json").unwrap(),
998                        )
999                        .with_header(
1000                            Header::from_bytes(
1001                                "Access-Control-Allow-Origin",
1002                                cors_origin.as_bytes().to_vec(),
1003                            )
1004                            .unwrap(),
1005                        ),
1006                );
1007                let _ = request.respond(response);
1008                mt.record_request("POST", 413);
1009                continue;
1010            }
1011
1012            // Headers.
1013            let content_type = request
1014                .headers()
1015                .iter()
1016                .find(|h| h.field.as_str() == "Content-Type" || h.field.as_str() == "content-type")
1017                .map(|h| h.value.as_str().to_string())
1018                .unwrap_or_else(|| "application/octet-stream".into());
1019            let filename = request
1020                .headers()
1021                .iter()
1022                .find(|h| h.field.as_str() == "X-Filename" || h.field.as_str() == "x-filename")
1023                .map(|h| h.value.as_str().to_string())
1024                .unwrap_or_else(|| "upload".into());
1025
1026            // If multipart, extract the first file part. Otherwise use bytes directly.
1027            let (name, ct, payload) = if content_type.starts_with("multipart/form-data") {
1028                match parse_multipart_first_file(&bytes, &content_type) {
1029                    Some(p) => p,
1030                    None => {
1031                        let err = json_error("INVALID_MULTIPART", "Could not parse multipart body");
1032                        let response = with_security_headers(
1033                            Response::from_string(&err)
1034                                .with_status_code(400u16)
1035                                .with_header(
1036                                    Header::from_bytes("Content-Type", "application/json").unwrap(),
1037                                )
1038                                .with_header(
1039                                    Header::from_bytes(
1040                                        "Access-Control-Allow-Origin",
1041                                        cors_origin.as_bytes().to_vec(),
1042                                    )
1043                                    .unwrap(),
1044                                ),
1045                        );
1046                        let _ = request.respond(response);
1047                        mt.record_request("POST", 400);
1048                        continue;
1049                    }
1050                }
1051            } else {
1052                (filename, content_type, bytes)
1053            };
1054
1055            let storage = pylon_storage::files::LocalFileStorage::new(
1056                &std::env::var("PYLON_FILES_DIR").unwrap_or_else(|_| "uploads".into()),
1057                &std::env::var("PYLON_FILES_URL_PREFIX").unwrap_or_else(|_| "/api/files".into()),
1058            );
1059
1060            let (status, body) =
1061                match pylon_storage::files::FileStorage::store(&storage, &name, &payload, &ct) {
1062                    Ok(stored) => (
1063                        201u16,
1064                        serde_json::to_string(&stored).unwrap_or_else(|_| "{}".into()),
1065                    ),
1066                    Err(e) => (500u16, json_error(&e.code, &e.message)),
1067                };
1068
1069            let response = with_security_headers(
1070                Response::from_string(&body)
1071                    .with_status_code(status)
1072                    .with_header(Header::from_bytes("Content-Type", "application/json").unwrap())
1073                    .with_header(
1074                        Header::from_bytes(
1075                            "Access-Control-Allow-Origin",
1076                            cors_origin.as_bytes().to_vec(),
1077                        )
1078                        .unwrap(),
1079                    ),
1080            );
1081            let _ = request.respond(response);
1082            mt.record_request("POST", status);
1083            continue;
1084        }
1085
1086        // Read body before routing (request is consumed by respond).
1087        // Skip for methods that cannot have a body.
1088        //
1089        // Size enforcement runs in TWO layers so a malicious client can't
1090        // stream 10 GiB into memory before we reject it:
1091        //   1. Content-Length header is compared to MAX_BODY_SIZE up front.
1092        //   2. The actual read uses `.take(MAX_BODY_SIZE + 1)` so a lying
1093        //      or chunked stream is capped at MAX + 1 bytes; if we read that
1094        //      many, we reject.
1095        const MAX_BODY_SIZE: usize = 10 * 1024 * 1024;
1096
1097        if let Some(declared) = request.body_length() {
1098            if declared > MAX_BODY_SIZE {
1099                let err_body = json_error(
1100                    "PAYLOAD_TOO_LARGE",
1101                    &format!("Content-Length {declared} exceeds max of {MAX_BODY_SIZE}"),
1102                );
1103                let response = with_security_headers(
1104                    Response::from_string(&err_body)
1105                        .with_status_code(413u16)
1106                        .with_header(
1107                            Header::from_bytes(
1108                                "Access-Control-Allow-Origin",
1109                                cors_origin.as_bytes().to_vec(),
1110                            )
1111                            .unwrap(),
1112                        ),
1113                );
1114                let _ = request.respond(response);
1115                mt.record_request(method.as_str(), 413);
1116                continue;
1117            }
1118        }
1119
1120        let mut body = String::new();
1121        if !matches!(
1122            method,
1123            Method::Get | Method::Head | Method::Options | Method::Delete
1124        ) {
1125            use std::io::Read;
1126            let mut limited = request.as_reader().take((MAX_BODY_SIZE as u64) + 1);
1127            let _ = limited.read_to_string(&mut body);
1128        }
1129
1130        if body.len() > MAX_BODY_SIZE {
1131            let err_body = json_error(
1132                "PAYLOAD_TOO_LARGE",
1133                &format!(
1134                    "Request body exceeds maximum size of {} bytes",
1135                    MAX_BODY_SIZE,
1136                ),
1137            );
1138            let response = with_security_headers(
1139                Response::from_string(&err_body)
1140                    .with_status_code(413u16)
1141                    .with_header(Header::from_bytes("Content-Type", "application/json").unwrap())
1142                    .with_header(
1143                        Header::from_bytes(
1144                            "Access-Control-Allow-Origin",
1145                            cors_origin.as_bytes().to_vec(),
1146                        )
1147                        .unwrap(),
1148                    ),
1149            );
1150            let _ = request.respond(response);
1151            mt.record_request(method.as_str(), 413);
1152            continue;
1153        }
1154
1155        // (auth_token + auth_ctx were resolved above, before the fast paths.)
1156
1157        // --- GET /api/shards/:id/connect — SSE snapshot stream ---
1158        if method == Method::Get {
1159            if let Some(rest) = url.strip_prefix("/api/shards/") {
1160                let rest = rest.split('?').next().unwrap_or(rest);
1161                if let Some(shard_id) = rest.strip_suffix("/connect") {
1162                    // Require an authenticated user. Shard SSE streams state
1163                    // snapshots tick-by-tick; an anonymous subscriber can
1164                    // both read that state AND influence via push_input (see
1165                    // the WS handler). Gate at the transport layer.
1166                    if auth_ctx.user_id.is_none() {
1167                        let err = json_error(
1168                            "AUTH_REQUIRED",
1169                            "Shard connect requires an authenticated session",
1170                        );
1171                        let response = with_security_headers(
1172                            Response::from_string(&err)
1173                                .with_status_code(401u16)
1174                                .with_header(
1175                                    Header::from_bytes("Content-Type", "application/json").unwrap(),
1176                                )
1177                                .with_header(
1178                                    Header::from_bytes(
1179                                        "Access-Control-Allow-Origin",
1180                                        cors_origin.as_bytes().to_vec(),
1181                                    )
1182                                    .unwrap(),
1183                                ),
1184                        );
1185                        let _ = request.respond(response);
1186                        mt.record_request("GET", 401);
1187                        continue;
1188                    }
1189                    let shards = match &shards_ref {
1190                        Some(s) => Arc::clone(s),
1191                        None => {
1192                            let err = json_error(
1193                                "SHARDS_NOT_AVAILABLE",
1194                                "Shard system is not configured",
1195                            );
1196                            let response = with_security_headers(
1197                                Response::from_string(&err)
1198                                    .with_status_code(503u16)
1199                                    .with_header(
1200                                        Header::from_bytes("Content-Type", "application/json")
1201                                            .unwrap(),
1202                                    )
1203                                    .with_header(
1204                                        Header::from_bytes(
1205                                            "Access-Control-Allow-Origin",
1206                                            cors_origin.as_bytes().to_vec(),
1207                                        )
1208                                        .unwrap(),
1209                                    ),
1210                            );
1211                            let _ = request.respond(response);
1212                            mt.record_request("GET", 503);
1213                            continue;
1214                        }
1215                    };
1216                    let shard = match shards.get(shard_id) {
1217                        Some(s) => s,
1218                        None => {
1219                            let err = json_error(
1220                                "SHARD_NOT_FOUND",
1221                                &format!("Shard \"{shard_id}\" not found"),
1222                            );
1223                            let response = with_security_headers(
1224                                Response::from_string(&err)
1225                                    .with_status_code(404u16)
1226                                    .with_header(
1227                                        Header::from_bytes("Content-Type", "application/json")
1228                                            .unwrap(),
1229                                    )
1230                                    .with_header(
1231                                        Header::from_bytes(
1232                                            "Access-Control-Allow-Origin",
1233                                            cors_origin.as_bytes().to_vec(),
1234                                        )
1235                                        .unwrap(),
1236                                    ),
1237                            );
1238                            let _ = request.respond(response);
1239                            mt.record_request("GET", 404);
1240                            continue;
1241                        }
1242                    };
1243
1244                    // Subscriber ID from ?sid= query param, else the authed user,
1245                    // else a generated anonymous ID.
1246                    let sub_id = url
1247                        .split("sid=")
1248                        .nth(1)
1249                        .and_then(|s| s.split('&').next())
1250                        .map(|s| s.to_string())
1251                        .or_else(|| auth_ctx.user_id.clone())
1252                        .unwrap_or_else(|| {
1253                            format!(
1254                                "anon_{}",
1255                                std::time::SystemTime::now()
1256                                    .duration_since(std::time::UNIX_EPOCH)
1257                                    .unwrap_or_default()
1258                                    .as_nanos()
1259                            )
1260                        });
1261                    let subscriber_id = pylon_realtime::SubscriberId::new(sub_id);
1262
1263                    let (tx, rx) = std::sync::mpsc::channel::<Vec<u8>>();
1264                    let streaming_body = StreamingBody::new(rx);
1265
1266                    let tx_clone = tx.clone();
1267                    let sink: pylon_realtime::SnapshotSink =
1268                        Box::new(move |tick: u64, bytes: &[u8]| {
1269                            // Format as SSE with an id: line carrying the tick
1270                            // number so clients can resume with Last-Event-ID.
1271                            let mut frame = format!("id: {tick}\ndata: ").into_bytes();
1272                            frame.extend_from_slice(bytes);
1273                            frame.extend_from_slice(b"\n\n");
1274                            let _ = tx_clone.send(frame);
1275                        });
1276
1277                    let shard_auth = pylon_realtime::ShardAuth {
1278                        user_id: auth_ctx.user_id.clone(),
1279                        is_admin: auth_ctx.is_admin,
1280                    };
1281                    if let Err(e) = shard.add_subscriber(subscriber_id.clone(), sink, &shard_auth) {
1282                        let (status, code) = match &e {
1283                            pylon_realtime::ShardError::Unauthorized(_) => (403u16, "UNAUTHORIZED"),
1284                            _ => (429u16, "SUBSCRIBE_FAILED"),
1285                        };
1286                        let err = json_error(code, &e.to_string());
1287                        let response = with_security_headers(
1288                            Response::from_string(&err)
1289                                .with_status_code(status)
1290                                .with_header(
1291                                    Header::from_bytes("Content-Type", "application/json").unwrap(),
1292                                )
1293                                .with_header(
1294                                    Header::from_bytes(
1295                                        "Access-Control-Allow-Origin",
1296                                        cors_origin.as_bytes().to_vec(),
1297                                    )
1298                                    .unwrap(),
1299                                ),
1300                        );
1301                        let _ = request.respond(response);
1302                        mt.record_request("GET", status);
1303                        continue;
1304                    }
1305
1306                    // Auto-unsubscribe when the client disconnects: we watch
1307                    // for mpsc channel disconnection in a sentinel thread.
1308                    {
1309                        let shard_cleanup = Arc::clone(&shard);
1310                        let sub_id_cleanup = subscriber_id.clone();
1311                        let tx_liveness = tx.clone();
1312                        std::thread::spawn(move || {
1313                            // Send a heartbeat every 30s; if send fails, the
1314                            // channel is closed (client disconnected).
1315                            loop {
1316                                std::thread::sleep(std::time::Duration::from_secs(30));
1317                                if tx_liveness.send(b": heartbeat\n\n".to_vec()).is_err() {
1318                                    shard_cleanup.remove_subscriber(&sub_id_cleanup);
1319                                    return;
1320                                }
1321                                if !shard_cleanup.is_running() {
1322                                    return;
1323                                }
1324                            }
1325                        });
1326                    }
1327
1328                    let response = with_security_headers(Response::new(
1329                        tiny_http::StatusCode(200),
1330                        vec![
1331                            Header::from_bytes("Content-Type", "text/event-stream").unwrap(),
1332                            Header::from_bytes("Cache-Control", "no-cache").unwrap(),
1333                            Header::from_bytes("Connection", "keep-alive").unwrap(),
1334                            Header::from_bytes(
1335                                "Access-Control-Allow-Origin",
1336                                cors_origin.as_bytes().to_vec(),
1337                            )
1338                            .unwrap(),
1339                        ],
1340                        streaming_body,
1341                        None,
1342                        None,
1343                    ));
1344                    let _ = request.respond(response);
1345                    mt.record_request("GET", 200);
1346                    continue;
1347                }
1348            }
1349        }
1350
1351        // --- POST /api/fn/:name with Accept: text/event-stream — streaming functions ---
1352        if method == Method::Post
1353            && url.starts_with("/api/fn/")
1354            && url != "/api/fn/traces"
1355            && request.headers().iter().any(|h| {
1356                (h.field.as_str() == "Accept" || h.field.as_str() == "accept")
1357                    && h.value.as_str().contains("text/event-stream")
1358            })
1359        {
1360            let fn_name = url
1361                .strip_prefix("/api/fn/")
1362                .unwrap_or("")
1363                .split('?')
1364                .next()
1365                .unwrap_or("")
1366                .to_string();
1367
1368            if let Some(fn_ops) = &fn_ops_maybe {
1369                // Mirror the router's gates so the streaming fast path doesn't
1370                // become a way to bypass function auth / rate limits.
1371                // 1. Function must exist (otherwise 404, not a hung SSE).
1372                if pylon_router::FnOps::get_fn(fn_ops.as_ref(), &fn_name).is_none() {
1373                    let err = json_error(
1374                        "FN_NOT_FOUND",
1375                        &format!("Function \"{fn_name}\" is not registered"),
1376                    );
1377                    let response = with_security_headers(
1378                        Response::from_string(&err)
1379                            .with_status_code(404u16)
1380                            .with_header(
1381                                Header::from_bytes("Content-Type", "application/json").unwrap(),
1382                            )
1383                            .with_header(
1384                                Header::from_bytes(
1385                                    "Access-Control-Allow-Origin",
1386                                    cors_origin.as_bytes().to_vec(),
1387                                )
1388                                .unwrap(),
1389                            ),
1390                    );
1391                    let _ = request.respond(response);
1392                    mt.record_request("POST", 404);
1393                    continue;
1394                }
1395                // 2. Per-function rate limit (identity = user_id or "anon").
1396                let identity = auth_ctx.user_id.as_deref().unwrap_or("anon");
1397                if let Err(retry_after) =
1398                    pylon_router::FnOps::check_rate_limit(fn_ops.as_ref(), &fn_name, identity)
1399                {
1400                    let body = format!(
1401                        r#"{{"error":{{"code":"RATE_LIMITED","message":"Function \"{fn_name}\" rate limit exceeded","retry_after_secs":{retry_after}}}}}"#
1402                    );
1403                    let response = with_security_headers(
1404                        Response::from_string(&body)
1405                            .with_status_code(429u16)
1406                            .with_header(
1407                                Header::from_bytes("Content-Type", "application/json").unwrap(),
1408                            )
1409                            .with_header(
1410                                Header::from_bytes(
1411                                    "Access-Control-Allow-Origin",
1412                                    cors_origin.as_bytes().to_vec(),
1413                                )
1414                                .unwrap(),
1415                            ),
1416                    );
1417                    let _ = request.respond(response);
1418                    mt.record_request("POST", 429);
1419                    continue;
1420                }
1421
1422                let args: serde_json::Value =
1423                    serde_json::from_str(&body).unwrap_or(serde_json::json!({}));
1424
1425                let auth = pylon_functions::protocol::AuthInfo {
1426                    user_id: auth_ctx.user_id.clone(),
1427                    is_admin: auth_ctx.is_admin,
1428                    tenant_id: auth_ctx.tenant_id.clone(),
1429                };
1430
1431                let (tx, rx) = std::sync::mpsc::channel::<Vec<u8>>();
1432                let streaming_body = StreamingBody::new(rx);
1433
1434                let fn_ops_cl = Arc::clone(fn_ops);
1435                let tx_stream = tx.clone();
1436                std::thread::spawn(move || {
1437                    let tx_cb = tx_stream.clone();
1438                    let on_stream: Box<dyn FnMut(&str) + Send> = Box::new(move |chunk: &str| {
1439                        let sse = format!("data: {}\n\n", chunk);
1440                        let _ = tx_cb.send(sse.into_bytes());
1441                    });
1442
1443                    let result = pylon_router::FnOps::call(
1444                        fn_ops_cl.as_ref(),
1445                        &fn_name,
1446                        args,
1447                        auth,
1448                        Some(on_stream),
1449                        None, // streaming /api/fn/:name never carries HTTP request metadata
1450                    );
1451                    match result {
1452                        Ok((value, _trace)) => {
1453                            let done = format!(
1454                                "event: result\ndata: {}\n\n",
1455                                serde_json::to_string(&value).unwrap_or_else(|_| "null".into())
1456                            );
1457                            let _ = tx_stream.send(done.into_bytes());
1458                        }
1459                        Err(e) => {
1460                            let err = format!(
1461                                "event: error\ndata: {}\n\n",
1462                                serde_json::json!({"code": e.code, "message": e.message})
1463                            );
1464                            let _ = tx_stream.send(err.into_bytes());
1465                        }
1466                    }
1467                });
1468
1469                let response = with_security_headers(Response::new(
1470                    tiny_http::StatusCode(200),
1471                    vec![
1472                        Header::from_bytes("Content-Type", "text/event-stream").unwrap(),
1473                        Header::from_bytes("Cache-Control", "no-cache").unwrap(),
1474                        Header::from_bytes("Connection", "keep-alive").unwrap(),
1475                        Header::from_bytes(
1476                            "Access-Control-Allow-Origin",
1477                            cors_origin.as_bytes().to_vec(),
1478                        )
1479                        .unwrap(),
1480                    ],
1481                    streaming_body,
1482                    None,
1483                    None,
1484                ));
1485                let _ = request.respond(response);
1486                mt.record_request("POST", 200);
1487                continue;
1488            }
1489        }
1490
1491        // --- POST /api/ai/stream — SSE streaming AI completion ---
1492        if url == "/api/ai/stream" && method == Method::Post {
1493            // AI endpoints spend real money per call. Require auth so a
1494            // drive-by caller can't burn through the provider budget.
1495            if auth_ctx.user_id.is_none() {
1496                let err = json_error(
1497                    "AUTH_REQUIRED",
1498                    "/api/ai/stream requires an authenticated session",
1499                );
1500                let response = with_security_headers(
1501                    Response::from_string(&err)
1502                        .with_status_code(401u16)
1503                        .with_header(
1504                            Header::from_bytes("Content-Type", "application/json").unwrap(),
1505                        )
1506                        .with_header(
1507                            Header::from_bytes(
1508                                "Access-Control-Allow-Origin",
1509                                cors_origin.as_bytes().to_vec(),
1510                            )
1511                            .unwrap(),
1512                        ),
1513                );
1514                let _ = request.respond(response);
1515                mt.record_request("POST", 401);
1516                continue;
1517            }
1518            let ai_provider = std::env::var("PYLON_AI_PROVIDER").unwrap_or_default();
1519            let ai_key = std::env::var("PYLON_AI_API_KEY").unwrap_or_default();
1520            let ai_model = std::env::var("PYLON_AI_MODEL").unwrap_or_default();
1521            let ai_base = std::env::var("PYLON_AI_BASE_URL").unwrap_or_default();
1522
1523            if ai_key.is_empty() && ai_provider != "custom" {
1524                let err = json_error(
1525                    "AI_NOT_CONFIGURED",
1526                    "Set PYLON_AI_PROVIDER and PYLON_AI_API_KEY",
1527                );
1528                let response = with_security_headers(
1529                    Response::from_string(&err)
1530                        .with_status_code(503u16)
1531                        .with_header(
1532                            Header::from_bytes("Content-Type", "application/json").unwrap(),
1533                        )
1534                        .with_header(
1535                            Header::from_bytes(
1536                                "Access-Control-Allow-Origin",
1537                                cors_origin.as_bytes().to_vec(),
1538                            )
1539                            .unwrap(),
1540                        ),
1541                );
1542                let _ = request.respond(response);
1543                mt.record_request("POST", 503);
1544                continue;
1545            }
1546
1547            let parsed: serde_json::Value = match serde_json::from_str(&body) {
1548                Ok(v) => v,
1549                Err(_) => {
1550                    let err = json_error("INVALID_JSON", "Invalid request body");
1551                    let response = with_security_headers(
1552                        Response::from_string(&err)
1553                            .with_status_code(400u16)
1554                            .with_header(
1555                                Header::from_bytes("Content-Type", "application/json").unwrap(),
1556                            )
1557                            .with_header(
1558                                Header::from_bytes(
1559                                    "Access-Control-Allow-Origin",
1560                                    cors_origin.as_bytes().to_vec(),
1561                                )
1562                                .unwrap(),
1563                            ),
1564                    );
1565                    let _ = request.respond(response);
1566                    mt.record_request("POST", 400);
1567                    continue;
1568                }
1569            };
1570
1571            let messages: Vec<AiMessage> = match parsed.get("messages").and_then(|m| m.as_array()) {
1572                Some(arr) => arr
1573                    .iter()
1574                    .filter_map(|m| {
1575                        let role = m.get("role")?.as_str()?.to_string();
1576                        let content = m.get("content")?.as_str()?.to_string();
1577                        Some(AiMessage { role, content })
1578                    })
1579                    .collect(),
1580                None => {
1581                    let err = json_error("MISSING_FIELD", "\"messages\" array is required");
1582                    let response = with_security_headers(
1583                        Response::from_string(&err)
1584                            .with_status_code(400u16)
1585                            .with_header(
1586                                Header::from_bytes("Content-Type", "application/json").unwrap(),
1587                            )
1588                            .with_header(
1589                                Header::from_bytes(
1590                                    "Access-Control-Allow-Origin",
1591                                    cors_origin.as_bytes().to_vec(),
1592                                )
1593                                .unwrap(),
1594                            ),
1595                    );
1596                    let _ = request.respond(response);
1597                    mt.record_request("POST", 400);
1598                    continue;
1599                }
1600            };
1601
1602            // Override model from request body if provided.
1603            let model = parsed
1604                .get("model")
1605                .and_then(|m| m.as_str())
1606                .map(|s| s.to_string())
1607                .unwrap_or(ai_model);
1608
1609            let proxy = match ai_provider.as_str() {
1610                "anthropic" => AiProxyPlugin::anthropic(&ai_key, &model),
1611                "openai" => AiProxyPlugin::openai(&ai_key, &model),
1612                "custom" => AiProxyPlugin::custom_with_model(&ai_base, &ai_key, &model),
1613                _ => AiProxyPlugin::openai(&ai_key, &model),
1614            };
1615
1616            // Set up a channel-based streaming body so tiny_http streams
1617            // data to the client as chunks arrive from the AI provider.
1618            let (tx, rx) = std::sync::mpsc::channel::<Vec<u8>>();
1619            let streaming_body = StreamingBody::new(rx);
1620
1621            // Spawn the provider request on a background thread. Each chunk
1622            // is formatted as an SSE event and pushed through the channel.
1623            std::thread::spawn(move || {
1624                let result = proxy.stream_completion(&messages, &mut |chunk| {
1625                    let sse = format!(
1626                        "data: {}
1627
1628",
1629                        serde_json::json!({
1630                            "choices": [{"index": 0, "delta": {"content": chunk}}]
1631                        })
1632                    );
1633                    let _ = tx.send(sse.into_bytes());
1634                });
1635
1636                // Send a final event indicating completion or error.
1637                match result {
1638                    Ok(_) => {
1639                        let _ = tx.send(
1640                            b"data: [DONE]
1641
1642"
1643                            .to_vec(),
1644                        );
1645                    }
1646                    Err(e) => {
1647                        let err_event = format!(
1648                            "data: {}
1649
1650",
1651                            serde_json::json!({"error": {"message": e, "type": "stream_error"}})
1652                        );
1653                        let _ = tx.send(err_event.into_bytes());
1654                    }
1655                }
1656                // tx is dropped here, which causes StreamingBody::read to return 0 (EOF).
1657            });
1658
1659            let response = with_security_headers(Response::new(
1660                tiny_http::StatusCode(200),
1661                vec![
1662                    Header::from_bytes("Content-Type", "text/event-stream").unwrap(),
1663                    Header::from_bytes("Cache-Control", "no-cache").unwrap(),
1664                    Header::from_bytes("Connection", "keep-alive").unwrap(),
1665                    Header::from_bytes(
1666                        "Access-Control-Allow-Origin",
1667                        cors_origin.as_bytes().to_vec(),
1668                    )
1669                    .unwrap(),
1670                ],
1671                streaming_body,
1672                None, // unknown content length = chunked transfer
1673                None,
1674            ));
1675            let _ = request.respond(response);
1676            mt.record_request("POST", 200);
1677            continue;
1678        }
1679
1680        // Studio route (returns HTML, not JSON).
1681        //
1682        // Privileged admin UI. It renders the full schema and lets the
1683        // operator run mutations against the data browser. In production we
1684        // require an admin token; in dev mode we leave it open so
1685        // `pylon dev` remains friction-free for the single-user case.
1686        //
1687        // Serving a WWW-Authenticate Basic realm isn't useful here because
1688        // admin auth is bearer-token based. Callers get a 401 and should
1689        // retry with `Authorization: Bearer <PYLON_ADMIN_TOKEN>`.
1690        let (status, response_body, content_type, is_studio, extra_headers) = if (url == "/studio"
1691            || url == "/studio/")
1692            && method == Method::Get
1693        {
1694            if !is_dev && !auth_ctx.is_admin {
1695                let body = json_error(
1696                    "AUTH_REQUIRED",
1697                    "/studio requires admin auth in production (set PYLON_ADMIN_TOKEN and pass it as Bearer)",
1698                );
1699                let response = with_security_headers(
1700                    Response::from_string(&body)
1701                        .with_status_code(401u16)
1702                        .with_header(
1703                            Header::from_bytes("Content-Type", "application/json").unwrap(),
1704                        )
1705                        .with_header(
1706                            Header::from_bytes(
1707                                "Access-Control-Allow-Origin",
1708                                cors_origin.as_bytes().to_vec(),
1709                            )
1710                            .unwrap(),
1711                        ),
1712                );
1713                let _ = request.respond(response);
1714                mt.record_request("GET", 401);
1715                continue;
1716            }
1717            // Derive the public base URL from the request's Host header +
1718            // X-Forwarded-Proto (Fly / any HTTPS terminator sets this).
1719            // Hardcoding `http://localhost:{port}` here meant the studio
1720            // HTML served from pylon-crm.fly.dev tried to fetch
1721            // http://localhost:4321/api/* from the browser, which CSP
1722            // rightly blocks.
1723            let host = request
1724                .headers()
1725                .iter()
1726                .find(|h| h.field.equiv("Host"))
1727                .map(|h| h.value.as_str().to_string())
1728                .unwrap_or_else(|| format!("localhost:{port}"));
1729            let scheme = request
1730                .headers()
1731                .iter()
1732                .find(|h| h.field.equiv("X-Forwarded-Proto"))
1733                .map(|h| h.value.as_str().to_string())
1734                .unwrap_or_else(|| "http".to_string());
1735            let base = format!("{scheme}://{host}");
1736            let html = pylon_studio_api::generate_studio_html(rt.manifest(), &base);
1737            (
1738                200u16,
1739                html,
1740                "text/html",
1741                true,
1742                Vec::<(String, String)>::new(),
1743            )
1744        } else {
1745            // Run plugin middleware with per-request metadata so rate-limit
1746            // plugins can bucket by peer IP (not just user id) when the
1747            // caller is anonymous.
1748            let meta = pylon_plugin::RequestMeta {
1749                peer_ip: peer_ip.as_str(),
1750            };
1751            if let Err(e) = pr.run_on_request_with_meta(method.as_str(), &url, &auth_ctx, &meta) {
1752                (
1753                    e.status,
1754                    json_error(&e.code, &e.message),
1755                    "application/json",
1756                    false,
1757                    Vec::new(),
1758                )
1759            } else if let Some((s, b)) =
1760                pr.try_handle_route(method.as_str(), &url, &body, &auth_ctx)
1761            {
1762                // Plugin handled the route.
1763                (s, b, "application/json", false, Vec::new())
1764            } else {
1765                let notifier = WsSseNotifier {
1766                    ws: Arc::clone(&wh),
1767                    sse: Arc::clone(&sh),
1768                };
1769                let openapi_gen = RuntimeOpenApiGenerator {
1770                    manifest: rt.manifest(),
1771                };
1772                let file_ops = LocalFileOps::new_default();
1773                let cache_adapter = CacheAdapter(Arc::clone(&ca));
1774                let pubsub_adapter = PubSubAdapter(Arc::clone(&ps));
1775                let email_adapter = EmailAdapter::from_env();
1776                let fn_ops: Option<&dyn pylon_router::FnOps> =
1777                    fn_ops_ref.as_deref().map(|f| f as &dyn pylon_router::FnOps);
1778                let shard_adapter = shards_ref.as_ref().map(|reg| ShardOpsAdapter {
1779                    registry: Arc::clone(reg),
1780                });
1781                let shard_ops: Option<&dyn pylon_router::ShardOps> = shard_adapter
1782                    .as_ref()
1783                    .map(|a| a as &dyn pylon_router::ShardOps);
1784                let plugin_hooks = PluginHooksAdapter(Arc::clone(&pr));
1785                // Snapshot request headers as (name, value) pairs for the
1786                // router to forward into webhook-invoked actions. Header
1787                // names are left as-sent; the router lowercases + merges
1788                // duplicates per RFC 7230 when constructing RequestInfo.
1789                let request_headers: Vec<(String, String)> = request
1790                    .headers()
1791                    .iter()
1792                    .map(|h| (h.field.as_str().to_string(), h.value.as_str().to_string()))
1793                    .collect();
1794                let router_ctx = pylon_router::RouterContext {
1795                    store: rt.as_ref(),
1796                    session_store: &ss,
1797                    magic_codes: &mc,
1798                    oauth_state: &os,
1799                    policy_engine: &pe,
1800                    change_log: &cl,
1801                    notifier: &notifier,
1802                    rooms: rm.as_ref(),
1803                    cache: &cache_adapter,
1804                    pubsub: &pubsub_adapter,
1805                    jobs: jq.as_ref(),
1806                    scheduler: sc.as_ref(),
1807                    workflows: we.as_ref(),
1808                    files: &file_ops,
1809                    openapi: &openapi_gen,
1810                    functions: fn_ops,
1811                    email: &email_adapter,
1812                    shards: shard_ops,
1813                    plugin_hooks: &plugin_hooks,
1814                    auth_ctx: &auth_ctx,
1815                    is_dev,
1816                    request_headers: &request_headers,
1817                    cookie_config: cookie_config.as_ref(),
1818                    response_headers: std::cell::RefCell::new(Vec::new()),
1819                };
1820                let http_method = HttpMethod::from_str(method.as_str());
1821                let (s, b, _ct) = pylon_router::route(
1822                    &router_ctx,
1823                    http_method,
1824                    &url,
1825                    &body,
1826                    auth_token.as_deref(),
1827                );
1828                let extra_headers = router_ctx.take_response_headers();
1829                (s, b, "application/json", false, extra_headers)
1830            }
1831        };
1832
1833        let mut response = Response::from_string(&response_body)
1834            .with_status_code(status)
1835            .with_header(Header::from_bytes("Content-Type", content_type).unwrap())
1836            .with_header(
1837                Header::from_bytes(
1838                    "Access-Control-Allow-Origin",
1839                    cors_origin.as_bytes().to_vec(),
1840                )
1841                .unwrap(),
1842            )
1843            .with_header(
1844                Header::from_bytes(
1845                    "Access-Control-Allow-Methods",
1846                    "GET, POST, PATCH, DELETE, OPTIONS",
1847                )
1848                .unwrap(),
1849            )
1850            .with_header(
1851                Header::from_bytes(
1852                    "Access-Control-Allow-Headers",
1853                    "Content-Type, Authorization",
1854                )
1855                .unwrap(),
1856            );
1857        // Cookie-based auth requires `Access-Control-Allow-Credentials:
1858        // true` on the response, paired with a specific origin. Vary
1859        // ensures intermediaries don't cache one origin's response and
1860        // serve it back to a different origin's browser.
1861        if allow_credentials {
1862            response = response
1863                .with_header(
1864                    Header::from_bytes("Access-Control-Allow-Credentials", "true").unwrap(),
1865                )
1866                .with_header(Header::from_bytes("Vary", "Origin").unwrap());
1867        }
1868
1869        // Apply any extra headers handlers attached via the router context
1870        // (Set-Cookie on login/logout, Location on OAuth GET callback).
1871        // Bytes from these headers come from server-built strings — bad
1872        // bytes here would be a programming bug, not request-driven, so a
1873        // failed Header::from_bytes is silently dropped rather than
1874        // poisoning the response.
1875        for (name, value) in extra_headers {
1876            if let Ok(h) = Header::from_bytes(name.as_bytes(), value.as_bytes().to_vec()) {
1877                response = response.with_header(h);
1878            }
1879        }
1880
1881        // Add Content-Security-Policy for Studio HTML responses.
1882        //
1883        // Studio talks to the same Rust process over HTTP (same origin)
1884        // AND a sibling WebSocket port (port+1, scheme ws:). CSP's
1885        // `default-src` covers `connect-src` by fallback, so any
1886        // directive we set there must include the WS scheme or the
1887        // browser silently blocks the live-sync connection.
1888        //
1889        // `ws:` + `wss:` cover localhost dev + TLS deploys without
1890        // hard-coding ports. Same-origin `'self'` keeps HTTP fetches
1891        // allowed. Inline + eval stay for the Tailwind/Babel CDN scripts
1892        // the current Studio HTML includes.
1893        if is_studio {
1894            response = response.with_header(
1895                Header::from_bytes(
1896                    "Content-Security-Policy",
1897                    "default-src 'self' 'unsafe-inline' 'unsafe-eval' https://cdn.tailwindcss.com https://unpkg.com ws: wss:",
1898                ).unwrap(),
1899            );
1900        }
1901
1902        let response = with_security_headers(response);
1903
1904        let _ = request.respond(response);
1905        mt.record_request(method.as_str(), status);
1906    }
1907
1908    tracing::warn!("Shutting down gracefully...");
1909
1910    // --- Drain phase ---
1911    // Stop accepting new work, let in-flight finish, close subsystems cleanly.
1912    let drain_timeout = std::time::Duration::from_secs(
1913        std::env::var("PYLON_DRAIN_SECS")
1914            .ok()
1915            .and_then(|s| s.parse().ok())
1916            .unwrap_or(10),
1917    );
1918    let start = Instant::now();
1919
1920    // Stop any running shards so their tick loops exit.
1921    if let Some(reg) = &shard_registry {
1922        for id in reg.ids() {
1923            if let Some(shard) = reg.get(&id) {
1924                shard.stop();
1925            }
1926        }
1927    }
1928
1929    // Let the scheduler finish its current cycle.
1930    let _ = &scheduler; // drop Arc at end of scope
1931
1932    // Wait for outstanding workers to idle, up to drain_timeout.
1933    while start.elapsed() < drain_timeout {
1934        let pending_jobs = job_queue.stats().pending;
1935        if pending_jobs == 0 {
1936            break;
1937        }
1938        std::thread::sleep(std::time::Duration::from_millis(100));
1939    }
1940
1941    let elapsed = start.elapsed();
1942    tracing::warn!(
1943        "Drain complete in {:.1}s (timeout {}s)",
1944        elapsed.as_secs_f32(),
1945        drain_timeout.as_secs()
1946    );
1947    Ok(())
1948}
1949
1950// The route() function has been extracted to the `pylon-router` crate.
1951// See `pylon_router::route()` for the platform-agnostic routing logic.
1952// The server now delegates to it via a `RouterContext`.
1953
1954fn json_error(code: &str, message: &str) -> String {
1955    pylon_router::json_error(code, message)
1956}
1957
1958/// Build the session store. Persists by default for file-backed runtimes —
1959/// sessions live in a sibling `<db>.sessions.db` file next to the app DB
1960/// unless `PYLON_SESSION_DB` overrides the path or
1961/// `PYLON_SESSION_IN_MEMORY=1` opts out. In-memory runtimes (tests)
1962/// get an in-memory session store.
1963///
1964/// This used to be opt-in, which silently broke every app after a server
1965/// restart: tokens in browser localStorage resolved to anonymous, pulls
1966/// came back empty under policy, and mutations 400'd with UNAUTHENTICATED.
1967fn build_session_store(app_db_path: Option<&str>) -> SessionStore {
1968    if std::env::var("PYLON_SESSION_IN_MEMORY")
1969        .map(|v| v == "1" || v == "true")
1970        .unwrap_or(false)
1971    {
1972        return SessionStore::new();
1973    }
1974    let explicit = std::env::var("PYLON_SESSION_DB").ok();
1975    let default_path = app_db_path.map(|p| format!("{p}.sessions.db"));
1976    let path = match explicit.or(default_path) {
1977        Some(p) => p,
1978        None => return SessionStore::new(),
1979    };
1980    match crate::session_backend::SqliteSessionBackend::open(&path) {
1981        Ok(backend) => {
1982            tracing::info!("[pylon] Session persistence enabled: {path}");
1983            SessionStore::with_backend(Box::new(backend))
1984        }
1985        Err(e) => {
1986            tracing::warn!(
1987                "[pylon] could not open session DB {path}: {e}. Falling back to in-memory sessions."
1988            );
1989            SessionStore::new()
1990        }
1991    }
1992}
1993
1994/// Parse a `multipart/form-data` body and return the first file part found.
1995///
1996/// Returns `(filename, content_type, bytes)` on success, `None` if the body
1997/// can't be parsed or no file part exists.
1998///
1999/// Handles the common RFC 7578 subset used by browsers and curl:
2000/// - `--boundary` part separators
2001/// - `Content-Disposition: form-data; name=...; filename=...`
2002/// - `Content-Type: ...`
2003/// - blank line, then raw bytes, then `\r\n--boundary` terminator
2004fn parse_multipart_first_file(
2005    body: &[u8],
2006    content_type_header: &str,
2007) -> Option<(String, String, Vec<u8>)> {
2008    // Extract the boundary parameter.
2009    let boundary_param = content_type_header
2010        .split(';')
2011        .find_map(|p| p.trim().strip_prefix("boundary="))?;
2012    let boundary = boundary_param.trim_matches('"');
2013    let delimiter = format!("--{boundary}");
2014    let delimiter_bytes = delimiter.as_bytes();
2015
2016    // Find each part between delimiters.
2017    let mut pos = 0usize;
2018    while pos < body.len() {
2019        // Find the next delimiter.
2020        let next = find_subslice(&body[pos..], delimiter_bytes)?;
2021        let part_start = pos + next + delimiter_bytes.len();
2022        // Skip CRLF or -- (terminator) after the delimiter.
2023        if part_start + 2 > body.len() {
2024            return None;
2025        }
2026        if &body[part_start..part_start + 2] == b"--" {
2027            return None; // end of parts, no file found
2028        }
2029        let header_start = part_start + skip_crlf(&body[part_start..]);
2030
2031        // Find end-of-headers (blank line).
2032        let header_end_offset = find_subslice(&body[header_start..], b"\r\n\r\n")?;
2033        let headers = &body[header_start..header_start + header_end_offset];
2034        let data_start = header_start + header_end_offset + 4;
2035
2036        // Find the next delimiter — that's where this part's data ends.
2037        let next_delim_offset = find_subslice(&body[data_start..], delimiter_bytes)?;
2038        // Strip the trailing CRLF before the delimiter.
2039        let mut data_end = data_start + next_delim_offset;
2040        if data_end >= 2 && &body[data_end - 2..data_end] == b"\r\n" {
2041            data_end -= 2;
2042        }
2043
2044        // Parse headers we care about.
2045        let headers_str = std::str::from_utf8(headers).ok()?;
2046        let mut filename: Option<String> = None;
2047        let mut part_ct = String::from("application/octet-stream");
2048        let mut has_file = false;
2049        for line in headers_str.split("\r\n") {
2050            let lower = line.to_ascii_lowercase();
2051            if let Some(rest) = lower.strip_prefix("content-disposition:") {
2052                if rest.contains("filename=") {
2053                    has_file = true;
2054                    // Extract filename="xxx"
2055                    if let Some(start) = line.find("filename=\"") {
2056                        let from = start + 10;
2057                        if let Some(end_offset) = line[from..].find('"') {
2058                            filename = Some(line[from..from + end_offset].to_string());
2059                        }
2060                    }
2061                }
2062            } else if let Some(rest) = lower.strip_prefix("content-type:") {
2063                part_ct = rest.trim().to_string();
2064            }
2065        }
2066
2067        if has_file {
2068            let name = filename.unwrap_or_else(|| "upload".into());
2069            return Some((name, part_ct, body[data_start..data_end].to_vec()));
2070        }
2071
2072        pos = data_end;
2073    }
2074    None
2075}
2076
2077fn find_subslice(haystack: &[u8], needle: &[u8]) -> Option<usize> {
2078    if needle.is_empty() || needle.len() > haystack.len() {
2079        return None;
2080    }
2081    haystack.windows(needle.len()).position(|w| w == needle)
2082}
2083
2084fn skip_crlf(buf: &[u8]) -> usize {
2085    if buf.len() >= 2 && &buf[0..2] == b"\r\n" {
2086        2
2087    } else if !buf.is_empty() && buf[0] == b'\n' {
2088        1
2089    } else {
2090        0
2091    }
2092}
2093
2094#[cfg(test)]
2095mod multipart_tests {
2096    use super::*;
2097
2098    #[test]
2099    fn parses_single_file() {
2100        let body = b"--bnd\r\n\
2101Content-Disposition: form-data; name=\"file\"; filename=\"hello.txt\"\r\n\
2102Content-Type: text/plain\r\n\
2103\r\n\
2104Hello world\r\n\
2105--bnd--\r\n";
2106        let ct = "multipart/form-data; boundary=bnd";
2107        let (name, content_type, bytes) = parse_multipart_first_file(body, ct).unwrap();
2108        assert_eq!(name, "hello.txt");
2109        assert_eq!(content_type, "text/plain");
2110        assert_eq!(bytes, b"Hello world");
2111    }
2112
2113    #[test]
2114    fn returns_none_without_file_part() {
2115        let body = b"--bnd\r\n\
2116Content-Disposition: form-data; name=\"field\"\r\n\
2117\r\n\
2118just text\r\n\
2119--bnd--\r\n";
2120        let ct = "multipart/form-data; boundary=bnd";
2121        assert!(parse_multipart_first_file(body, ct).is_none());
2122    }
2123
2124    #[test]
2125    fn returns_none_when_no_boundary() {
2126        assert!(parse_multipart_first_file(b"anything", "application/json").is_none());
2127    }
2128}