Skip to main content

pylon_runtime/
server.rs

1#[allow(unused_imports)]
2use std::io::Read;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5use std::time::Instant;
6
7use pylon_auth::SessionStore;
8use pylon_http::HttpMethod;
9use pylon_plugin::PluginRegistry;
10use pylon_policy::PolicyEngine;
11use pylon_sync::{ChangeKind, ChangeLog};
12use tiny_http::{Header, Method, Response, Server};
13
14use crate::datastore::{
15    CacheAdapter, EmailAdapter, LocalFileOps, PluginHooksAdapter, PubSubAdapter,
16    RuntimeOpenApiGenerator, ShardOpsAdapter, WsSseNotifier,
17};
18use crate::jobs::{JobQueue, JobResult, Worker};
19use crate::metrics::Metrics;
20use crate::pubsub::PubSubBroker;
21use crate::rate_limit::RateLimiter;
22use crate::rooms::RoomManager;
23use crate::scheduler::Scheduler;
24use crate::sse::SseHub;
25use crate::workflows::WorkflowEngine;
26use crate::ws::WsHub;
27use crate::Runtime;
28use pylon_plugin::builtin::ai_proxy::{AiMessage, AiProxyPlugin};
29use pylon_plugin::builtin::cache::CachePlugin;
30
31// ---------------------------------------------------------------------------
32// Streaming body — bridges mpsc::Receiver to std::io::Read for SSE responses
33// ---------------------------------------------------------------------------
34
35/// A streaming response body backed by an MPSC channel.
36///
37/// When used as the body of a `tiny_http::Response`, it causes the server to
38/// write data as it arrives through the channel. Dropping the sender closes
39/// the stream (EOF).
40struct StreamingBody {
41    rx: std::sync::mpsc::Receiver<Vec<u8>>,
42    buf: Vec<u8>,
43    pos: usize,
44}
45
46impl StreamingBody {
47    fn new(rx: std::sync::mpsc::Receiver<Vec<u8>>) -> Self {
48        Self {
49            rx,
50            buf: Vec::new(),
51            pos: 0,
52        }
53    }
54}
55
56impl std::io::Read for StreamingBody {
57    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
58        // Drain any leftover data from a previous recv that was larger than
59        // the caller's buffer.
60        if self.pos < self.buf.len() {
61            let remaining = &self.buf[self.pos..];
62            let n = remaining.len().min(buf.len());
63            buf[..n].copy_from_slice(&remaining[..n]);
64            self.pos += n;
65            if self.pos >= self.buf.len() {
66                self.buf.clear();
67                self.pos = 0;
68            }
69            return Ok(n);
70        }
71
72        // Block until the next chunk arrives or the sender is dropped.
73        match self.rx.recv() {
74            Ok(data) if data.is_empty() => Ok(0),
75            Ok(data) => {
76                let n = data.len().min(buf.len());
77                buf[..n].copy_from_slice(&data[..n]);
78                if n < data.len() {
79                    self.buf = data;
80                    self.pos = n;
81                }
82                Ok(n)
83            }
84            Err(_) => Ok(0), // Channel closed = EOF
85        }
86    }
87}
88
89/// Global shutdown flag. Set via `request_shutdown()` to trigger graceful exit.
90static SHUTDOWN: AtomicBool = AtomicBool::new(false);
91
92/// Request a graceful shutdown of the running server.
93///
94/// This sets the shutdown flag and, if a server handle has been stashed, calls
95/// `unblock()` to wake the request loop. Safe to call from any thread or signal
96/// handler.
97pub fn request_shutdown() {
98    SHUTDOWN.store(true, Ordering::SeqCst);
99    // If a server handle is available, unblock the request loop so it can
100    // observe the flag immediately rather than waiting for the next request.
101    if let Some(srv) = SERVER_HANDLE.get() {
102        srv.unblock();
103    }
104}
105
106/// Global handle to the `tiny_http::Server` so `request_shutdown()` can call
107/// `unblock()` without requiring callers to hold a reference.
108static SERVER_HANDLE: std::sync::OnceLock<Arc<Server>> = std::sync::OnceLock::new();
109
110// ---------------------------------------------------------------------------
111// Security headers
112// ---------------------------------------------------------------------------
113
114/// Common security headers applied to every response.
115fn security_headers() -> Vec<Header> {
116    vec![
117        Header::from_bytes("X-Content-Type-Options", "nosniff").unwrap(),
118        Header::from_bytes("X-Frame-Options", "DENY").unwrap(),
119        Header::from_bytes("X-XSS-Protection", "1; mode=block").unwrap(),
120    ]
121}
122
123/// Add security headers to a response.
124fn with_security_headers<R: std::io::Read>(response: Response<R>) -> Response<R> {
125    let mut resp = response;
126    for header in security_headers() {
127        resp = resp.with_header(header);
128    }
129    resp
130}
131
132/// Start the dev server on the given port. Blocks until shutdown.
133pub fn start(runtime: Arc<Runtime>, port: u16) -> Result<(), String> {
134    start_with_plugins(runtime, port, None)
135}
136
137/// Start the dev server with optional plugins. Blocks until shutdown.
138pub fn start_with_plugins(
139    runtime: Arc<Runtime>,
140    port: u16,
141    plugins: Option<Arc<PluginRegistry>>,
142) -> Result<(), String> {
143    start_server(runtime, port, plugins, None)
144}
145
146/// Start the dev server with plugins and a shard registry for real-time
147/// simulations (games, MMO zones, etc.). Blocks until shutdown.
148pub fn start_with_shards(
149    runtime: Arc<Runtime>,
150    port: u16,
151    plugins: Option<Arc<PluginRegistry>>,
152    shard_registry: Arc<dyn pylon_realtime::DynShardRegistry>,
153) -> Result<(), String> {
154    start_server(runtime, port, plugins, Some(shard_registry))
155}
156
157fn start_server(
158    runtime: Arc<Runtime>,
159    port: u16,
160    plugins: Option<Arc<PluginRegistry>>,
161    shard_registry: Option<Arc<dyn pylon_realtime::DynShardRegistry>>,
162) -> Result<(), String> {
163    // Run the tracing-exporter hook BEFORE anything else emits spans. The
164    // operator registers it via `pylon_observability::set_tracing_hook`
165    // at process init; here we invoke it exactly once on startup. No-op
166    // if nothing was registered.
167    pylon_observability::run_tracing_hook();
168
169    let addr = format!("0.0.0.0:{port}");
170    let server = Server::http(&addr).map_err(|e| format!("Failed to start server: {e}"))?;
171    let server = Arc::new(server);
172
173    // Stash a handle so `request_shutdown()` can unblock the loop.
174    let _ = SERVER_HANDLE.set(Arc::clone(&server));
175
176    let session_store = Arc::new(build_session_store(runtime.db_path().as_deref()));
177    let magic_codes = Arc::new(pylon_auth::MagicCodeStore::new());
178    // Persist OAuth state if a session DB is configured. Reuse the same path
179    // (sessions and OAuth state are both small auth artifacts that should
180    // outlive a restart). Falls back to in-memory if no DB path is set.
181    let oauth_state = Arc::new(match std::env::var("PYLON_SESSION_DB").ok() {
182        Some(path) => match crate::oauth_backend::SqliteOAuthBackend::open(&path) {
183            Ok(backend) => pylon_auth::OAuthStateStore::with_backend(Box::new(backend)),
184            Err(e) => {
185                tracing::warn!(
186                    "[auth] OAuth state SQLite backend unavailable: {e} — falling back to in-memory"
187                );
188                pylon_auth::OAuthStateStore::new()
189            }
190        },
191        None => pylon_auth::OAuthStateStore::new(),
192    });
193    let policy_engine = Arc::new(PolicyEngine::from_manifest(runtime.manifest()));
194    let change_log = Arc::new(ChangeLog::new());
195
196    // Seed the change log with one synthetic insert per extant row so that
197    // a pull from seq=0 after a restart reconstructs current state. The
198    // change log is in-memory — restarting the process without this would
199    // leave SQLite rows unreachable via /api/sync/pull (clients would
200    // pull nothing and see an empty replica). Seqs here are fresh; clients
201    // whose cursors are ahead of `self.seq` get a 410 and full resync,
202    // which then hits this seeded log and gets every current row back.
203    for entity in runtime.manifest().entities.iter() {
204        match runtime.list(&entity.name) {
205            Ok(rows) => {
206                for row in rows {
207                    if let Some(id) = row.get("id").and_then(|v| v.as_str()) {
208                        change_log.append(&entity.name, id, ChangeKind::Insert, Some(row.clone()));
209                    }
210                }
211            }
212            Err(_) => {
213                // Entity table may not exist yet on first boot — skip.
214            }
215        }
216    }
217    let ws_hub = WsHub::new();
218    let sse_hub = SseHub::new();
219    // Default-register the rate-limit plugin when no custom registry was
220    // supplied. Without this, self-hosted deployments would launch with
221    // auth endpoints (/api/auth/magic/send, /api/auth/magic/verify,
222    // /api/auth/session) wide open to brute force and enumeration.
223    //
224    // Dev: 100k/min so a React app's initial bundle + auth + sync pulls
225    // (each worth ~6-10 requests) doesn't immediately 429 the dev. Prod:
226    // 100/min per IP — tight enough to crush burst attackers, loose
227    // enough for legitimate multi-tab UIs. Callers passing their own
228    // registry are responsible for their own limits.
229    // Probe dev mode NOW — defined for real at line ~300 but plugin
230    // registration below needs it. Same env-var, same logic.
231    let is_dev_early = std::env::var("PYLON_DEV_MODE")
232        .map(|v| v == "1" || v == "true")
233        .unwrap_or(true);
234    let plugin_rl_max: u32 = if is_dev_early { 100_000 } else { 100 };
235    let plugin_reg: Arc<PluginRegistry> = plugins.unwrap_or_else(|| {
236        let mut reg = PluginRegistry::new(runtime.manifest().clone());
237        reg.register(Arc::new(
238            pylon_plugin::builtin::rate_limit::RateLimitPlugin::new(
239                plugin_rl_max,
240                std::time::Duration::from_secs(60),
241            ),
242        ));
243        // Auto-scope any entity that declares a `tenantId` field. This is
244        // how multi-tenant isolation becomes a default posture rather than
245        // an opt-in: drop the field on the entity and the plugin takes it
246        // from there (stamps inserts, rejects cross-tenant writes).
247        reg.register(Arc::new(
248            pylon_plugin::builtin::tenant_scope::TenantScopePlugin::from_manifest(
249                runtime.manifest(),
250            ),
251        ));
252        Arc::new(reg)
253    });
254    let room_mgr = Arc::new(RoomManager::new(120)); // 2 min idle timeout
255    let ws_port = port + 1;
256    let sse_port = port + 2;
257
258    // Record server start time for the health endpoint.
259    let start_time = Instant::now();
260
261    let metrics = Arc::new(Metrics::new());
262
263    // Cache and pub/sub shared instances.
264    let cache = Arc::new(CachePlugin::new(100_000));
265    let pubsub_broker = Arc::new(PubSubBroker::new(100));
266
267    // Job queue, scheduler, and background workers.
268    let job_queue = Arc::new(JobQueue::new(1000));
269
270    // Persistent job store. Colocate with the app DB so `./app.db` gets
271    // `./app.db.jobs.db` automatically — otherwise jobs land in CWD, which
272    // is wherever the server was launched from (confusing and fragile).
273    // In-memory runtimes and the `PYLON_JOBS_IN_MEMORY=1` opt-out both
274    // skip persistence.
275    let jobs_in_memory = std::env::var("PYLON_JOBS_IN_MEMORY")
276        .map(|v| v == "1" || v == "true")
277        .unwrap_or(false);
278    if !jobs_in_memory {
279        let jobs_db_path = std::env::var("PYLON_JOBS_DB").ok().unwrap_or_else(|| {
280            runtime
281                .db_path()
282                .map(|p| format!("{p}.jobs.db"))
283                .unwrap_or_else(|| "pylon.jobs.db".into())
284        });
285        match crate::job_store::JobStore::open(&jobs_db_path) {
286            Ok(store) => {
287                let store = Arc::new(store);
288                let restored = job_queue.restore_from(&store);
289                if restored > 0 {
290                    tracing::info!("[jobs] Restored {restored} pending job(s) from {jobs_db_path}");
291                }
292                job_queue.attach_store(store);
293            }
294            Err(e) => {
295                tracing::warn!(
296                    "[jobs] Could not open job store at {jobs_db_path}: {e} — running without persistence"
297                );
298            }
299        }
300    }
301
302    // Register built-in framework jobs.
303    {
304        let cache_ref = Arc::clone(&cache);
305        job_queue.register(
306            "pylon.cache.cleanup",
307            Arc::new(move |_job| {
308                cache_ref.cleanup_expired();
309                JobResult::Success
310            }),
311        );
312        let rooms_ref = Arc::clone(&room_mgr);
313        job_queue.register(
314            "pylon.rooms.cleanup",
315            Arc::new(move |_job| {
316                rooms_ref.cleanup_idle();
317                JobResult::Success
318            }),
319        );
320    }
321
322    let scheduler = Arc::new(Scheduler::new(Arc::clone(&job_queue)));
323    // Schedule built-in tasks.
324    let _ = scheduler.schedule(
325        "pylon.cache.cleanup",
326        "*/10 * * * *",
327        Arc::new(|_| JobResult::Success),
328    );
329    let _ = scheduler.schedule(
330        "pylon.rooms.cleanup",
331        "*/5 * * * *",
332        Arc::new(|_| JobResult::Success),
333    );
334
335    // Start 2 background workers.
336    let _worker_handles: Vec<_> = (0..2)
337        .map(|i| {
338            let w = Worker::new(Arc::clone(&job_queue), &format!("worker-{i}"));
339            w.start()
340        })
341        .collect();
342
343    // Start the scheduler.
344    let _scheduler_handle = Arc::clone(&scheduler).start();
345
346    // Workflow engine: TS runner URL configurable via env, defaults to local Bun server.
347    let wf_runner_url = std::env::var("PYLON_WORKFLOW_RUNNER_URL")
348        .unwrap_or_else(|_| "http://127.0.0.1:9876/run".to_string());
349    let workflow_engine = Arc::new(WorkflowEngine::new(&wf_runner_url, 10_000));
350
351    // Rate limiter: per-IP outer cap on total requests.
352    //
353    // Defaults:
354    //   - Dev mode: effectively off (100k/min) so a React app's initial
355    //     bundle load + sync pulls + user clicks don't immediately 429.
356    //     100/min blew through during a single login + first sync pull.
357    //   - Prod: 600/min (10 req/sec average). Still tight, but a real app
358    //     should override with PYLON_RATE_LIMIT_MAX anyway.
359    //
360    // Override with PYLON_RATE_LIMIT_MAX + PYLON_RATE_LIMIT_WINDOW.
361    let default_rl_max = if is_dev_early { 100_000 } else { 600 };
362    let rl_max: u32 = std::env::var("PYLON_RATE_LIMIT_MAX")
363        .ok()
364        .and_then(|v| v.parse().ok())
365        .unwrap_or(default_rl_max);
366    let rl_window: u64 = std::env::var("PYLON_RATE_LIMIT_WINDOW")
367        .ok()
368        .and_then(|v| v.parse().ok())
369        .unwrap_or(60);
370    let rate_limiter = Arc::new(RateLimiter::new(rl_max, rl_window));
371
372    // Per-function rate limiter: separate bucket per (caller, function) pair.
373    // Defaults to a stricter cap because functions are heavier than reads.
374    // Override via PYLON_FN_RATE_LIMIT_MAX / PYLON_FN_RATE_LIMIT_WINDOW.
375    let fn_rl_max: u32 = std::env::var("PYLON_FN_RATE_LIMIT_MAX")
376        .ok()
377        .and_then(|v| v.parse().ok())
378        .unwrap_or(30);
379    let fn_rl_window: u64 = std::env::var("PYLON_FN_RATE_LIMIT_WINDOW")
380        .ok()
381        .and_then(|v| v.parse().ok())
382        .unwrap_or(60);
383    let fn_rate_limiter = Arc::new(RateLimiter::new(fn_rl_max, fn_rl_window));
384
385    // TypeScript function runtime: optional Bun process that loads functions/*.ts
386    // If no `functions/` directory exists or Bun isn't installed, this is None
387    // and /api/fn/* routes return 503.
388    // Build a notifier adapter so function mutations (`ctx.db.insert/update/delete`)
389    // emit change events to WS + SSE subscribers on COMMIT. Without this,
390    // functions write to the DB but sync clients never see the update
391    // live — they only catch up on the next refetch.
392    let fn_notifier: Arc<dyn pylon_router::ChangeNotifier> =
393        Arc::new(crate::datastore::WsSseNotifier {
394            ws: Arc::clone(&ws_hub),
395            sse: Arc::clone(&sse_hub),
396        });
397    let fn_ops_maybe = crate::datastore::try_spawn_functions(
398        Arc::clone(&runtime),
399        Arc::clone(&job_queue),
400        Arc::clone(&fn_rate_limiter),
401        Arc::clone(&change_log),
402        fn_notifier,
403    );
404
405    // Dev mode flag. Gates a *lot* of permissive behavior: magic codes
406    // appear in JSON responses, /studio is open without admin auth,
407    // POST /api/auth/session can mint sessions for arbitrary user_ids,
408    // OAuth callback accepts a caller-supplied email, CORS defaults to
409    // `*`, etc. Defaulting to `true` meant a prod deploy that simply
410    // forgot the env var was trivially compromisable — flip to safe-
411    // by-default and let the CLI's `pylon dev` opt in explicitly.
412    let is_dev = std::env::var("PYLON_DEV_MODE")
413        .map(|v| v == "1" || v == "true")
414        .unwrap_or(false);
415
416    // CORS origin. Defaults to `*` in dev for convenience; in prod we refuse
417    // to start with a wildcard because the server sends `Access-Control-
418    // Allow-Credentials: true` elsewhere and also accepts `Authorization:
419    // Bearer <session>`. The combination of `*` + credentials is a spec
420    // violation that some browsers tolerate, and even when they don't it
421    // lets any origin drive bearer-auth APIs.
422    let cors_origin = match std::env::var("PYLON_CORS_ORIGIN") {
423        Ok(v) => v,
424        Err(_) if is_dev => "*".to_string(),
425        Err(_) => {
426            return Err(
427                "PYLON_CORS_ORIGIN must be set in production (non-dev mode). \
428                Set it to your frontend's origin, or set PYLON_DEV_MODE=true \
429                for local development."
430                    .into(),
431            );
432        }
433    };
434    if !is_dev && cors_origin == "*" {
435        return Err("PYLON_CORS_ORIGIN=\"*\" is refused in production mode. \
436            Set it to an explicit origin (https://app.example.com)."
437            .into());
438    }
439    // Browsers forbid combining `Access-Control-Allow-Origin: *` with
440    // `Access-Control-Allow-Credentials: true`. Cookie-based auth needs
441    // credentials, so we only emit the credentials header when the origin
442    // is specific. In dev with `*` we lose cookies-from-cross-origin
443    // (acceptable: dev typically uses same-origin proxying), but we
444    // refuse to send a header combo browsers will reject either way.
445    let allow_credentials = cors_origin != "*";
446    // Validate the origin once so per-request header construction can never
447    // panic on bad bytes. Previously every `Header::from_bytes(...).unwrap()`
448    // was a potential request-triggered DoS via env misconfiguration.
449    if Header::from_bytes(
450        "Access-Control-Allow-Origin",
451        cors_origin.as_bytes().to_vec(),
452    )
453    .is_err()
454    {
455        return Err(format!(
456            "PYLON_CORS_ORIGIN={cors_origin:?} contains bytes that are not a valid HTTP header value"
457        ));
458    }
459
460    // Admin token: read once at startup, not per-request.
461    let admin_token: Option<String> = std::env::var("PYLON_ADMIN_TOKEN").ok();
462
463    // Session cookie config — built once. Cookie name defaults to
464    // `${app_name}_session` so multiple Pylon apps on the same parent
465    // domain don't clobber each other's cookies. Browsers receive an
466    // HttpOnly+Secure+SameSite=Lax cookie by default; the same opaque
467    // session token continues to work via `Authorization: Bearer …`
468    // for CLI / mobile / server-to-server callers.
469    let cookie_config = Arc::new({
470        let app_name = runtime.manifest().name.as_str();
471        pylon_auth::CookieConfig::from_env(&pylon_auth::CookieConfig::default_name_for(app_name))
472    });
473
474    // CSRF protection. Enforced inline at the HTTP layer because the plugin
475    // trait's `on_request` hook doesn't see request headers. For
476    // state-changing methods (POST/PATCH/PUT/DELETE) we check Origin, then
477    // Referer, against the allowlist.
478    //
479    // Allowlist resolution:
480    //   - PYLON_CSRF_ORIGINS (comma-separated) if set
481    //   - otherwise PYLON_CORS_ORIGIN (already validated above)
482    //   - in dev, fall back to allow-any to avoid breaking local tooling.
483    let csrf_origins: Vec<String> = match std::env::var("PYLON_CSRF_ORIGINS") {
484        Ok(v) => v
485            .split(',')
486            .map(|s| s.trim().to_string())
487            .filter(|s| !s.is_empty())
488            .collect(),
489        Err(_) => {
490            if is_dev {
491                vec!["*".to_string()]
492            } else if cors_origin != "*" {
493                vec![cors_origin.clone()]
494            } else {
495                // Non-dev + wildcard was already rejected, but guard anyway.
496                vec![]
497            }
498        }
499    };
500    let csrf = Arc::new(pylon_plugin::builtin::csrf::CsrfPlugin::new(csrf_origins));
501
502    // Start WebSocket server on port+1.
503    //
504    // The snapshot fetcher gives the WS reader a way to ship the current
505    // CRDT snapshot to a client the instant it subscribes — without it
506    // the new tab would have to wait for the next write before catching
507    // up to the converged state. Encodes into the same length-prefixed
508    // wire frame as the broadcast path so the client decoder is shared.
509    //
510    // Authz is enforced HERE, not at the WS layer: the closure runs the
511    // row through `check_entity_read` against the caller's auth ctx and
512    // returns None on deny. The caller (handle_crdt_control) treats
513    // None as "don't subscribe" — so a denied client can't sit on a
514    // subscription waiting for a future write to leak state.
515    {
516        let hub = Arc::clone(&ws_hub);
517        let sessions = Arc::clone(&session_store);
518        let runtime_for_fetcher = Arc::clone(&runtime);
519        let pe_for_fetcher = Arc::clone(&policy_engine);
520        let fetcher: crate::ws::SnapshotFetcher = Arc::new(move |auth_ctx, entity, row_id| {
521            use pylon_http::DataStore;
522            // Fetch the row first so the policy engine can evaluate
523            // row-level predicates (`data.authorId == auth.userId`
524            // etc). Missing row → deny silently; the client just
525            // never gets a frame and can't probe existence.
526            let row = match runtime_for_fetcher.get_by_id(entity, row_id) {
527                Ok(Some(v)) => v,
528                _ => return None,
529            };
530            if !matches!(
531                pe_for_fetcher.check_entity_read(entity, auth_ctx, Some(&row)),
532                pylon_policy::PolicyResult::Allowed
533            ) {
534                return None;
535            }
536            let snap = match runtime_for_fetcher.crdt_snapshot(entity, row_id) {
537                Ok(Some(bytes)) => bytes,
538                _ => return None,
539            };
540            pylon_router::encode_crdt_frame(
541                pylon_router::CRDT_FRAME_SNAPSHOT,
542                entity,
543                row_id,
544                &snap,
545            )
546            .ok()
547        });
548        std::thread::spawn(move || {
549            crate::ws::start_ws_server(hub, sessions, ws_port, Some(fetcher));
550        });
551    }
552
553    // Start SSE server on port+2.
554    {
555        let hub = Arc::clone(&sse_hub);
556        std::thread::spawn(move || {
557            crate::sse::start_sse_server(hub, sse_port);
558        });
559    }
560
561    // Start shard WebSocket server on port+3 when a registry is provided.
562    let shard_ws_port = port + 3;
563    if let Some(reg) = shard_registry.clone() {
564        let sessions = Arc::clone(&session_store);
565        std::thread::spawn(move || {
566            crate::shard_ws::start_shard_ws_server(reg, sessions, shard_ws_port);
567        });
568    }
569
570    tracing::warn!("pylon dev server listening on http://localhost:{port}");
571    tracing::info!("  WebSocket: ws://localhost:{ws_port}");
572    tracing::info!("  Studio: http://localhost:{port}/studio");
573    tracing::info!("  API:    http://localhost:{port}/api/entities/<entity>");
574    tracing::info!("  Auth:   http://localhost:{port}/api/auth/session");
575
576    // Use recv() in a loop instead of incoming_requests() so we can share
577    // the Arc<Server> with the shutdown path (incoming_requests borrows &self
578    // which prevents moving the Arc into another thread).
579    loop {
580        if SHUTDOWN.load(Ordering::Relaxed) {
581            break;
582        }
583
584        let mut request = match server.recv() {
585            Ok(rq) => rq,
586            Err(_) => {
587                // recv() returns Err when unblocked or the socket is closed.
588                break;
589            }
590        };
591
592        if SHUTDOWN.load(Ordering::Relaxed) {
593            break;
594        }
595
596        let rt = Arc::clone(&runtime);
597        let ss = Arc::clone(&session_store);
598        let pe = Arc::clone(&policy_engine);
599        let cl = Arc::clone(&change_log);
600        let wh = Arc::clone(&ws_hub);
601        let sh = Arc::clone(&sse_hub);
602        let mc = Arc::clone(&magic_codes);
603        let pr = Arc::clone(&plugin_reg);
604        let rm = Arc::clone(&room_mgr);
605        let mt = Arc::clone(&metrics);
606        let os = Arc::clone(&oauth_state);
607        let ca = Arc::clone(&cache);
608        let ps = Arc::clone(&pubsub_broker);
609        let jq = Arc::clone(&job_queue);
610        let sc = Arc::clone(&scheduler);
611        let we = Arc::clone(&workflow_engine);
612        let fn_ops_ref = fn_ops_maybe.clone();
613        let shards_ref = shard_registry.clone();
614        let cors_origin = cors_origin.clone();
615        let cookie_config = Arc::clone(&cookie_config);
616        let allow_credentials = allow_credentials;
617        let is_dev = is_dev;
618
619        let method = request.method().clone();
620        let url = request.url().to_string();
621
622        // --- Health check: fast path before auth or body parsing ---
623        if url == "/health" && method == Method::Get {
624            let uptime = start_time.elapsed().as_secs();
625            let body = serde_json::json!({
626                "status": "ok",
627                "version": "0.1.0",
628                "uptime_secs": uptime,
629            })
630            .to_string();
631
632            let response = with_security_headers(
633                Response::from_string(&body)
634                    .with_status_code(200u16)
635                    .with_header(Header::from_bytes("Content-Type", "application/json").unwrap())
636                    .with_header(
637                        Header::from_bytes(
638                            "Access-Control-Allow-Origin",
639                            cors_origin.as_bytes().to_vec(),
640                        )
641                        .unwrap(),
642                    ),
643            );
644            let _ = request.respond(response);
645            continue;
646        }
647
648        // --- Metrics endpoint: fast path before rate-limit / body parsing.
649        // Gate behind admin auth in non-dev to prevent leakage of function
650        // names, request volumes, and error rates to the public internet.
651        // Dev mode stays open so local Prometheus scrapers just work.
652        if url == "/metrics" && method == Method::Get {
653            if !is_dev {
654                let admin_bytes = admin_token.as_deref().unwrap_or("").as_bytes();
655                let auth_ok = !admin_bytes.is_empty()
656                    && request.headers().iter().any(|h| {
657                        let name = h.field.as_str().as_str();
658                        name.eq_ignore_ascii_case("Authorization")
659                            && h.value
660                                .as_str()
661                                .strip_prefix("Bearer ")
662                                .map(|t| pylon_auth::constant_time_eq(t.as_bytes(), admin_bytes))
663                                .unwrap_or(false)
664                    });
665                if !auth_ok {
666                    let body = json_error(
667                        "UNAUTHORIZED",
668                        "/metrics requires admin bearer token in non-dev mode",
669                    );
670                    let response = with_security_headers(
671                        Response::from_string(&body)
672                            .with_status_code(401u16)
673                            .with_header(
674                                Header::from_bytes("Content-Type", "application/json").unwrap(),
675                            ),
676                    );
677                    let _ = request.respond(response);
678                    continue;
679                }
680            }
681            let prefers_prometheus = request.headers().iter().any(|h| {
682                (h.field.as_str() == "Accept" || h.field.as_str() == "accept")
683                    && (h.value.as_str().contains("text/plain")
684                        || h.value.as_str().contains("application/openmetrics-text"))
685            });
686            let (body, content_type) = if prefers_prometheus {
687                (mt.prometheus(), "text/plain; version=0.0.4")
688            } else {
689                (mt.snapshot().to_string(), "application/json")
690            };
691            let response = with_security_headers(
692                Response::from_string(&body)
693                    .with_status_code(200u16)
694                    .with_header(Header::from_bytes("Content-Type", content_type).unwrap())
695                    .with_header(
696                        Header::from_bytes(
697                            "Access-Control-Allow-Origin",
698                            cors_origin.as_bytes().to_vec(),
699                        )
700                        .unwrap(),
701                    ),
702            );
703            let _ = request.respond(response);
704            mt.record_request("GET", 200);
705            continue;
706        }
707
708        // --- Rate limiting: check per-IP request count ---
709        let peer_ip = request
710            .remote_addr()
711            .map(|a| a.ip().to_string())
712            .unwrap_or_default();
713
714        // OPTIONS preflights are browser infrastructure, not user intent.
715        // Rate-limiting them makes a normal page effectively halve its
716        // budget (preflight + real request per call) and returns a 429
717        // that the browser can't interpret as a valid CORS response —
718        // the user-visible symptom is "Failed to fetch" on login. Skip.
719        let is_preflight = matches!(method, Method::Options);
720        if !is_preflight {
721            if let Err(retry_after) = rate_limiter.check(&peer_ip) {
722                let err_body = json_error(
723                    "RATE_LIMITED",
724                    &format!("Too many requests. Retry after {retry_after} seconds."),
725                );
726                let response = with_security_headers(
727                    Response::from_string(&err_body)
728                        .with_status_code(429u16)
729                        .with_header(
730                            Header::from_bytes("Content-Type", "application/json").unwrap(),
731                        )
732                        .with_header(
733                            Header::from_bytes(
734                                "Access-Control-Allow-Origin",
735                                cors_origin.as_bytes().to_vec(),
736                            )
737                            .unwrap(),
738                        )
739                        .with_header(
740                            Header::from_bytes(
741                                "Access-Control-Allow-Methods",
742                                "GET, POST, PATCH, DELETE, OPTIONS",
743                            )
744                            .unwrap(),
745                        )
746                        .with_header(
747                            Header::from_bytes(
748                                "Access-Control-Allow-Headers",
749                                "Content-Type, Authorization",
750                            )
751                            .unwrap(),
752                        )
753                        .with_header(
754                            Header::from_bytes(
755                                "Retry-After",
756                                retry_after.to_string().as_bytes().to_vec(),
757                            )
758                            .unwrap(),
759                        ),
760                );
761                let _ = request.respond(response);
762                mt.record_request(method.as_str(), 429);
763                continue;
764            }
765        } // end: if !is_preflight
766
767        // --- CSRF check on state-changing requests ---
768        //
769        // Browsers forbid cross-origin POST/PATCH/PUT/DELETE unless CORS
770        // allows it, but an attacker controlling another origin can still
771        // ship credentials-bearing requests if the server is permissive.
772        // The CSRF plugin validates Origin (then Referer) against an explicit
773        // allowlist — this is the check that was missing because the Plugin
774        // trait's `on_request` hook has no access to headers.
775        //
776        // The Authorization header carries bearer tokens, so CSRF mostly
777        // matters for cookie-based sessions — but we enforce globally: a
778        // request that misses Origin/Referer on a state-changing method is
779        // rejected, which is the safer default.
780        {
781            let method_str = method.as_str();
782            let is_bearer = request.headers().iter().any(|h| {
783                (h.field.as_str() == "Authorization" || h.field.as_str() == "authorization")
784                    && h.value.as_str().starts_with("Bearer ")
785            });
786            // Bearer-authenticated requests are not CSRF-vulnerable in the
787            // classic sense — browsers don't auto-attach bearer tokens. Skip
788            // the check for them so server-to-server API callers keep working
789            // without needing Origin headers.
790            if !is_bearer && !matches!(method, Method::Get | Method::Head | Method::Options) {
791                let origin = request
792                    .headers()
793                    .iter()
794                    .find(|h| h.field.as_str() == "Origin" || h.field.as_str() == "origin")
795                    .map(|h| h.value.as_str().to_string());
796                let referer = request
797                    .headers()
798                    .iter()
799                    .find(|h| h.field.as_str() == "Referer" || h.field.as_str() == "referer")
800                    .map(|h| h.value.as_str().to_string());
801                if let Err(err) = csrf.check(method_str, origin.as_deref(), referer.as_deref()) {
802                    let body = json_error(&err.code, &err.message);
803                    let response = with_security_headers(
804                        Response::from_string(&body)
805                            .with_status_code(err.status)
806                            .with_header(
807                                Header::from_bytes("Content-Type", "application/json").unwrap(),
808                            )
809                            .with_header(
810                                Header::from_bytes(
811                                    "Access-Control-Allow-Origin",
812                                    cors_origin.as_bytes().to_vec(),
813                                )
814                                .unwrap(),
815                            ),
816                    );
817                    let _ = request.respond(response);
818                    mt.record_request(method_str, err.status);
819                    continue;
820                }
821            }
822        }
823
824        // Extract auth token + auth context EARLY so every fast path (upload,
825        // shard SSE, fn streaming, AI streaming) can enforce auth the same
826        // way the router does. Previously these paths ran before auth
827        // extraction and bypassed the plugin/router auth chain entirely.
828        //
829        // Two transports for the same opaque session token:
830        //   1. `Authorization: Bearer <token>` — CLI, mobile, server-to-server
831        //   2. `Cookie: <name>=<token>` — browsers (HttpOnly, XSS can't read)
832        // Bearer wins when both are present (explicit beats ambient).
833        let bearer_token: Option<String> = request
834            .headers()
835            .iter()
836            .find(|h| h.field.as_str() == "Authorization" || h.field.as_str() == "authorization")
837            .and_then(|h| {
838                let val = h.value.as_str();
839                val.strip_prefix("Bearer ").map(|t| t.to_string())
840            });
841        let cookie_token: Option<String> = if bearer_token.is_some() {
842            None
843        } else {
844            request
845                .headers()
846                .iter()
847                .find(|h| h.field.as_str() == "Cookie" || h.field.as_str() == "cookie")
848                .and_then(|h| {
849                    pylon_auth::extract_session_cookie(h.value.as_str(), &cookie_config.name)
850                })
851        };
852        let auth_token: Option<String> = bearer_token.or(cookie_token);
853        let auth_ctx = if admin_token.is_some()
854            && auth_token.is_some()
855            && pylon_auth::constant_time_eq(
856                auth_token.as_deref().unwrap_or("").as_bytes(),
857                admin_token.as_deref().unwrap_or("").as_bytes(),
858            ) {
859            pylon_auth::AuthContext::admin()
860        } else {
861            ss.resolve(auth_token.as_deref())
862        };
863
864        // --- Test-reset endpoint — in-memory + dev mode + localhost only ---
865        //
866        // `pylon test` sets PYLON_IN_MEMORY=1 + PYLON_DEV_MODE=true.
867        // The TS helper `resetDb()` posts here between `test(...)` blocks
868        // to isolate cases. Gates:
869        //   1. dev mode (production refuses outright)
870        //   2. in-memory DB (belt-and-braces against accidental file wipes)
871        //   3. peer IP is loopback (a dev laptop often has localhost:4321
872        //      reachable; without this, a browser visiting a malicious
873        //      site could cross-site-POST a reset via a bare form —
874        //      blind CSRF that doesn't care about the response)
875        //
876        // Positioned AFTER the rate limiter and CSRF check on purpose so
877        // those middlewares apply — the earlier placement skipped both.
878        if url == "/api/__test__/reset" && method == Method::Post {
879            let is_loopback = peer_ip == "127.0.0.1"
880                || peer_ip == "::1"
881                || peer_ip.starts_with("127.")
882                || peer_ip == "localhost";
883            if !is_dev || !rt.is_in_memory() || !is_loopback {
884                let body = json_error(
885                    "RESET_REFUSED",
886                    "reset endpoint is only available in dev mode + in-memory DB + from loopback",
887                );
888                let response = with_security_headers(
889                    Response::from_string(&body)
890                        .with_status_code(403u16)
891                        .with_header(
892                            Header::from_bytes("Content-Type", "application/json").unwrap(),
893                        )
894                        .with_header(
895                            Header::from_bytes(
896                                "Access-Control-Allow-Origin",
897                                cors_origin.as_bytes().to_vec(),
898                            )
899                            .unwrap(),
900                        ),
901                );
902                let _ = request.respond(response);
903                mt.record_request("POST", 403);
904                continue;
905            }
906            let (status, body) = match rt.reset_for_tests() {
907                Ok(()) => (200u16, "{\"reset\":true}".to_string()),
908                Err(e) => (500u16, json_error(&e.code, &e.message)),
909            };
910            let response = with_security_headers(
911                Response::from_string(&body)
912                    .with_status_code(status)
913                    .with_header(Header::from_bytes("Content-Type", "application/json").unwrap())
914                    .with_header(
915                        Header::from_bytes(
916                            "Access-Control-Allow-Origin",
917                            cors_origin.as_bytes().to_vec(),
918                        )
919                        .unwrap(),
920                    ),
921            );
922            let _ = request.respond(response);
923            mt.record_request("POST", status);
924            continue;
925        }
926
927        // --- File upload fast path: handle binary body before string conversion ---
928        // Uploads come in two shapes:
929        //   1. Direct binary body with X-Filename / Content-Type headers
930        //   2. multipart/form-data with a file part
931        //
932        // Require an authenticated user. Uploads write to the files backend
933        // (and into the plugin audit log for soft-delete etc.), so
934        // unauthenticated callers cannot use this route.
935        if url == "/api/files/upload" && method == Method::Post {
936            const UPLOAD_MAX: usize = 10 * 1024 * 1024;
937            // Enforce size BEFORE reading the body so a 10 GiB stream can't
938            // buffer into memory. Content-Length pre-check, then bounded read.
939            if let Some(declared) = request.body_length() {
940                if declared > UPLOAD_MAX {
941                    let err = json_error(
942                        "PAYLOAD_TOO_LARGE",
943                        &format!("Content-Length {declared} exceeds upload max of {UPLOAD_MAX}"),
944                    );
945                    let response = with_security_headers(
946                        Response::from_string(&err)
947                            .with_status_code(413u16)
948                            .with_header(
949                                Header::from_bytes("Content-Type", "application/json").unwrap(),
950                            )
951                            .with_header(
952                                Header::from_bytes(
953                                    "Access-Control-Allow-Origin",
954                                    cors_origin.as_bytes().to_vec(),
955                                )
956                                .unwrap(),
957                            ),
958                    );
959                    let _ = request.respond(response);
960                    mt.record_request("POST", 413);
961                    continue;
962                }
963            }
964            if auth_ctx.user_id.is_none() {
965                let err = json_error(
966                    "AUTH_REQUIRED",
967                    "/api/files/upload requires an authenticated session",
968                );
969                let response = with_security_headers(
970                    Response::from_string(&err)
971                        .with_status_code(401u16)
972                        .with_header(
973                            Header::from_bytes("Content-Type", "application/json").unwrap(),
974                        )
975                        .with_header(
976                            Header::from_bytes(
977                                "Access-Control-Allow-Origin",
978                                cors_origin.as_bytes().to_vec(),
979                            )
980                            .unwrap(),
981                        ),
982                );
983                let _ = request.respond(response);
984                mt.record_request("POST", 401);
985                continue;
986            }
987            // Read up to UPLOAD_MAX + 1 bytes. If we read the full +1 we know
988            // the client lied about Content-Length (or used chunked encoding
989            // and overran). Reject in that case instead of continuing with a
990            // truncated file.
991            use std::io::Read;
992            let mut bytes: Vec<u8> = Vec::with_capacity(8192);
993            let mut limited = request.as_reader().take((UPLOAD_MAX as u64) + 1);
994            let _ = limited.read_to_end(&mut bytes);
995
996            const MAX: usize = UPLOAD_MAX;
997            if bytes.len() > MAX {
998                let err = json_error("PAYLOAD_TOO_LARGE", "File exceeds 10 MB limit");
999                let response = with_security_headers(
1000                    Response::from_string(&err)
1001                        .with_status_code(413u16)
1002                        .with_header(
1003                            Header::from_bytes("Content-Type", "application/json").unwrap(),
1004                        )
1005                        .with_header(
1006                            Header::from_bytes(
1007                                "Access-Control-Allow-Origin",
1008                                cors_origin.as_bytes().to_vec(),
1009                            )
1010                            .unwrap(),
1011                        ),
1012                );
1013                let _ = request.respond(response);
1014                mt.record_request("POST", 413);
1015                continue;
1016            }
1017
1018            // Headers.
1019            let content_type = request
1020                .headers()
1021                .iter()
1022                .find(|h| h.field.as_str() == "Content-Type" || h.field.as_str() == "content-type")
1023                .map(|h| h.value.as_str().to_string())
1024                .unwrap_or_else(|| "application/octet-stream".into());
1025            let filename = request
1026                .headers()
1027                .iter()
1028                .find(|h| h.field.as_str() == "X-Filename" || h.field.as_str() == "x-filename")
1029                .map(|h| h.value.as_str().to_string())
1030                .unwrap_or_else(|| "upload".into());
1031
1032            // If multipart, extract the first file part. Otherwise use bytes directly.
1033            let (name, ct, payload) = if content_type.starts_with("multipart/form-data") {
1034                match parse_multipart_first_file(&bytes, &content_type) {
1035                    Some(p) => p,
1036                    None => {
1037                        let err = json_error("INVALID_MULTIPART", "Could not parse multipart body");
1038                        let response = with_security_headers(
1039                            Response::from_string(&err)
1040                                .with_status_code(400u16)
1041                                .with_header(
1042                                    Header::from_bytes("Content-Type", "application/json").unwrap(),
1043                                )
1044                                .with_header(
1045                                    Header::from_bytes(
1046                                        "Access-Control-Allow-Origin",
1047                                        cors_origin.as_bytes().to_vec(),
1048                                    )
1049                                    .unwrap(),
1050                                ),
1051                        );
1052                        let _ = request.respond(response);
1053                        mt.record_request("POST", 400);
1054                        continue;
1055                    }
1056                }
1057            } else {
1058                (filename, content_type, bytes)
1059            };
1060
1061            let storage = pylon_storage::files::LocalFileStorage::new(
1062                &std::env::var("PYLON_FILES_DIR").unwrap_or_else(|_| "uploads".into()),
1063                &std::env::var("PYLON_FILES_URL_PREFIX").unwrap_or_else(|_| "/api/files".into()),
1064            );
1065
1066            let (status, body) =
1067                match pylon_storage::files::FileStorage::store(&storage, &name, &payload, &ct) {
1068                    Ok(stored) => (
1069                        201u16,
1070                        serde_json::to_string(&stored).unwrap_or_else(|_| "{}".into()),
1071                    ),
1072                    Err(e) => (500u16, json_error(&e.code, &e.message)),
1073                };
1074
1075            let response = with_security_headers(
1076                Response::from_string(&body)
1077                    .with_status_code(status)
1078                    .with_header(Header::from_bytes("Content-Type", "application/json").unwrap())
1079                    .with_header(
1080                        Header::from_bytes(
1081                            "Access-Control-Allow-Origin",
1082                            cors_origin.as_bytes().to_vec(),
1083                        )
1084                        .unwrap(),
1085                    ),
1086            );
1087            let _ = request.respond(response);
1088            mt.record_request("POST", status);
1089            continue;
1090        }
1091
1092        // Read body before routing (request is consumed by respond).
1093        // Skip for methods that cannot have a body.
1094        //
1095        // Size enforcement runs in TWO layers so a malicious client can't
1096        // stream 10 GiB into memory before we reject it:
1097        //   1. Content-Length header is compared to MAX_BODY_SIZE up front.
1098        //   2. The actual read uses `.take(MAX_BODY_SIZE + 1)` so a lying
1099        //      or chunked stream is capped at MAX + 1 bytes; if we read that
1100        //      many, we reject.
1101        const MAX_BODY_SIZE: usize = 10 * 1024 * 1024;
1102
1103        if let Some(declared) = request.body_length() {
1104            if declared > MAX_BODY_SIZE {
1105                let err_body = json_error(
1106                    "PAYLOAD_TOO_LARGE",
1107                    &format!("Content-Length {declared} exceeds max of {MAX_BODY_SIZE}"),
1108                );
1109                let response = with_security_headers(
1110                    Response::from_string(&err_body)
1111                        .with_status_code(413u16)
1112                        .with_header(
1113                            Header::from_bytes(
1114                                "Access-Control-Allow-Origin",
1115                                cors_origin.as_bytes().to_vec(),
1116                            )
1117                            .unwrap(),
1118                        ),
1119                );
1120                let _ = request.respond(response);
1121                mt.record_request(method.as_str(), 413);
1122                continue;
1123            }
1124        }
1125
1126        let mut body = String::new();
1127        if !matches!(
1128            method,
1129            Method::Get | Method::Head | Method::Options | Method::Delete
1130        ) {
1131            use std::io::Read;
1132            let mut limited = request.as_reader().take((MAX_BODY_SIZE as u64) + 1);
1133            let _ = limited.read_to_string(&mut body);
1134        }
1135
1136        if body.len() > MAX_BODY_SIZE {
1137            let err_body = json_error(
1138                "PAYLOAD_TOO_LARGE",
1139                &format!(
1140                    "Request body exceeds maximum size of {} bytes",
1141                    MAX_BODY_SIZE,
1142                ),
1143            );
1144            let response = with_security_headers(
1145                Response::from_string(&err_body)
1146                    .with_status_code(413u16)
1147                    .with_header(Header::from_bytes("Content-Type", "application/json").unwrap())
1148                    .with_header(
1149                        Header::from_bytes(
1150                            "Access-Control-Allow-Origin",
1151                            cors_origin.as_bytes().to_vec(),
1152                        )
1153                        .unwrap(),
1154                    ),
1155            );
1156            let _ = request.respond(response);
1157            mt.record_request(method.as_str(), 413);
1158            continue;
1159        }
1160
1161        // (auth_token + auth_ctx were resolved above, before the fast paths.)
1162
1163        // --- GET /api/shards/:id/connect — SSE snapshot stream ---
1164        if method == Method::Get {
1165            if let Some(rest) = url.strip_prefix("/api/shards/") {
1166                let rest = rest.split('?').next().unwrap_or(rest);
1167                if let Some(shard_id) = rest.strip_suffix("/connect") {
1168                    // Require an authenticated user. Shard SSE streams state
1169                    // snapshots tick-by-tick; an anonymous subscriber can
1170                    // both read that state AND influence via push_input (see
1171                    // the WS handler). Gate at the transport layer.
1172                    if auth_ctx.user_id.is_none() {
1173                        let err = json_error(
1174                            "AUTH_REQUIRED",
1175                            "Shard connect requires an authenticated session",
1176                        );
1177                        let response = with_security_headers(
1178                            Response::from_string(&err)
1179                                .with_status_code(401u16)
1180                                .with_header(
1181                                    Header::from_bytes("Content-Type", "application/json").unwrap(),
1182                                )
1183                                .with_header(
1184                                    Header::from_bytes(
1185                                        "Access-Control-Allow-Origin",
1186                                        cors_origin.as_bytes().to_vec(),
1187                                    )
1188                                    .unwrap(),
1189                                ),
1190                        );
1191                        let _ = request.respond(response);
1192                        mt.record_request("GET", 401);
1193                        continue;
1194                    }
1195                    let shards = match &shards_ref {
1196                        Some(s) => Arc::clone(s),
1197                        None => {
1198                            let err = json_error(
1199                                "SHARDS_NOT_AVAILABLE",
1200                                "Shard system is not configured",
1201                            );
1202                            let response = with_security_headers(
1203                                Response::from_string(&err)
1204                                    .with_status_code(503u16)
1205                                    .with_header(
1206                                        Header::from_bytes("Content-Type", "application/json")
1207                                            .unwrap(),
1208                                    )
1209                                    .with_header(
1210                                        Header::from_bytes(
1211                                            "Access-Control-Allow-Origin",
1212                                            cors_origin.as_bytes().to_vec(),
1213                                        )
1214                                        .unwrap(),
1215                                    ),
1216                            );
1217                            let _ = request.respond(response);
1218                            mt.record_request("GET", 503);
1219                            continue;
1220                        }
1221                    };
1222                    let shard = match shards.get(shard_id) {
1223                        Some(s) => s,
1224                        None => {
1225                            let err = json_error(
1226                                "SHARD_NOT_FOUND",
1227                                &format!("Shard \"{shard_id}\" not found"),
1228                            );
1229                            let response = with_security_headers(
1230                                Response::from_string(&err)
1231                                    .with_status_code(404u16)
1232                                    .with_header(
1233                                        Header::from_bytes("Content-Type", "application/json")
1234                                            .unwrap(),
1235                                    )
1236                                    .with_header(
1237                                        Header::from_bytes(
1238                                            "Access-Control-Allow-Origin",
1239                                            cors_origin.as_bytes().to_vec(),
1240                                        )
1241                                        .unwrap(),
1242                                    ),
1243                            );
1244                            let _ = request.respond(response);
1245                            mt.record_request("GET", 404);
1246                            continue;
1247                        }
1248                    };
1249
1250                    // Subscriber ID from ?sid= query param, else the authed user,
1251                    // else a generated anonymous ID.
1252                    let sub_id = url
1253                        .split("sid=")
1254                        .nth(1)
1255                        .and_then(|s| s.split('&').next())
1256                        .map(|s| s.to_string())
1257                        .or_else(|| auth_ctx.user_id.clone())
1258                        .unwrap_or_else(|| {
1259                            format!(
1260                                "anon_{}",
1261                                std::time::SystemTime::now()
1262                                    .duration_since(std::time::UNIX_EPOCH)
1263                                    .unwrap_or_default()
1264                                    .as_nanos()
1265                            )
1266                        });
1267                    let subscriber_id = pylon_realtime::SubscriberId::new(sub_id);
1268
1269                    let (tx, rx) = std::sync::mpsc::channel::<Vec<u8>>();
1270                    let streaming_body = StreamingBody::new(rx);
1271
1272                    let tx_clone = tx.clone();
1273                    let sink: pylon_realtime::SnapshotSink =
1274                        Box::new(move |tick: u64, bytes: &[u8]| {
1275                            // Format as SSE with an id: line carrying the tick
1276                            // number so clients can resume with Last-Event-ID.
1277                            let mut frame = format!("id: {tick}\ndata: ").into_bytes();
1278                            frame.extend_from_slice(bytes);
1279                            frame.extend_from_slice(b"\n\n");
1280                            let _ = tx_clone.send(frame);
1281                        });
1282
1283                    let shard_auth = pylon_realtime::ShardAuth {
1284                        user_id: auth_ctx.user_id.clone(),
1285                        is_admin: auth_ctx.is_admin,
1286                    };
1287                    if let Err(e) = shard.add_subscriber(subscriber_id.clone(), sink, &shard_auth) {
1288                        let (status, code) = match &e {
1289                            pylon_realtime::ShardError::Unauthorized(_) => (403u16, "UNAUTHORIZED"),
1290                            _ => (429u16, "SUBSCRIBE_FAILED"),
1291                        };
1292                        let err = json_error(code, &e.to_string());
1293                        let response = with_security_headers(
1294                            Response::from_string(&err)
1295                                .with_status_code(status)
1296                                .with_header(
1297                                    Header::from_bytes("Content-Type", "application/json").unwrap(),
1298                                )
1299                                .with_header(
1300                                    Header::from_bytes(
1301                                        "Access-Control-Allow-Origin",
1302                                        cors_origin.as_bytes().to_vec(),
1303                                    )
1304                                    .unwrap(),
1305                                ),
1306                        );
1307                        let _ = request.respond(response);
1308                        mt.record_request("GET", status);
1309                        continue;
1310                    }
1311
1312                    // Auto-unsubscribe when the client disconnects: we watch
1313                    // for mpsc channel disconnection in a sentinel thread.
1314                    {
1315                        let shard_cleanup = Arc::clone(&shard);
1316                        let sub_id_cleanup = subscriber_id.clone();
1317                        let tx_liveness = tx.clone();
1318                        std::thread::spawn(move || {
1319                            // Send a heartbeat every 30s; if send fails, the
1320                            // channel is closed (client disconnected).
1321                            loop {
1322                                std::thread::sleep(std::time::Duration::from_secs(30));
1323                                if tx_liveness.send(b": heartbeat\n\n".to_vec()).is_err() {
1324                                    shard_cleanup.remove_subscriber(&sub_id_cleanup);
1325                                    return;
1326                                }
1327                                if !shard_cleanup.is_running() {
1328                                    return;
1329                                }
1330                            }
1331                        });
1332                    }
1333
1334                    let response = with_security_headers(Response::new(
1335                        tiny_http::StatusCode(200),
1336                        vec![
1337                            Header::from_bytes("Content-Type", "text/event-stream").unwrap(),
1338                            Header::from_bytes("Cache-Control", "no-cache").unwrap(),
1339                            Header::from_bytes("Connection", "keep-alive").unwrap(),
1340                            Header::from_bytes(
1341                                "Access-Control-Allow-Origin",
1342                                cors_origin.as_bytes().to_vec(),
1343                            )
1344                            .unwrap(),
1345                        ],
1346                        streaming_body,
1347                        None,
1348                        None,
1349                    ));
1350                    let _ = request.respond(response);
1351                    mt.record_request("GET", 200);
1352                    continue;
1353                }
1354            }
1355        }
1356
1357        // --- POST /api/fn/:name with Accept: text/event-stream — streaming functions ---
1358        if method == Method::Post
1359            && url.starts_with("/api/fn/")
1360            && url != "/api/fn/traces"
1361            && request.headers().iter().any(|h| {
1362                (h.field.as_str() == "Accept" || h.field.as_str() == "accept")
1363                    && h.value.as_str().contains("text/event-stream")
1364            })
1365        {
1366            let fn_name = url
1367                .strip_prefix("/api/fn/")
1368                .unwrap_or("")
1369                .split('?')
1370                .next()
1371                .unwrap_or("")
1372                .to_string();
1373
1374            if let Some(fn_ops) = &fn_ops_maybe {
1375                // Mirror the router's gates so the streaming fast path doesn't
1376                // become a way to bypass function auth / rate limits.
1377                // 1. Function must exist (otherwise 404, not a hung SSE).
1378                if pylon_router::FnOps::get_fn(fn_ops.as_ref(), &fn_name).is_none() {
1379                    let err = json_error(
1380                        "FN_NOT_FOUND",
1381                        &format!("Function \"{fn_name}\" is not registered"),
1382                    );
1383                    let response = with_security_headers(
1384                        Response::from_string(&err)
1385                            .with_status_code(404u16)
1386                            .with_header(
1387                                Header::from_bytes("Content-Type", "application/json").unwrap(),
1388                            )
1389                            .with_header(
1390                                Header::from_bytes(
1391                                    "Access-Control-Allow-Origin",
1392                                    cors_origin.as_bytes().to_vec(),
1393                                )
1394                                .unwrap(),
1395                            ),
1396                    );
1397                    let _ = request.respond(response);
1398                    mt.record_request("POST", 404);
1399                    continue;
1400                }
1401                // 2. Per-function rate limit (identity = user_id or "anon").
1402                let identity = auth_ctx.user_id.as_deref().unwrap_or("anon");
1403                if let Err(retry_after) =
1404                    pylon_router::FnOps::check_rate_limit(fn_ops.as_ref(), &fn_name, identity)
1405                {
1406                    let body = format!(
1407                        r#"{{"error":{{"code":"RATE_LIMITED","message":"Function \"{fn_name}\" rate limit exceeded","retry_after_secs":{retry_after}}}}}"#
1408                    );
1409                    let response = with_security_headers(
1410                        Response::from_string(&body)
1411                            .with_status_code(429u16)
1412                            .with_header(
1413                                Header::from_bytes("Content-Type", "application/json").unwrap(),
1414                            )
1415                            .with_header(
1416                                Header::from_bytes(
1417                                    "Access-Control-Allow-Origin",
1418                                    cors_origin.as_bytes().to_vec(),
1419                                )
1420                                .unwrap(),
1421                            ),
1422                    );
1423                    let _ = request.respond(response);
1424                    mt.record_request("POST", 429);
1425                    continue;
1426                }
1427
1428                let args: serde_json::Value =
1429                    serde_json::from_str(&body).unwrap_or(serde_json::json!({}));
1430
1431                let auth = pylon_functions::protocol::AuthInfo {
1432                    user_id: auth_ctx.user_id.clone(),
1433                    is_admin: auth_ctx.is_admin,
1434                    tenant_id: auth_ctx.tenant_id.clone(),
1435                };
1436
1437                let (tx, rx) = std::sync::mpsc::channel::<Vec<u8>>();
1438                let streaming_body = StreamingBody::new(rx);
1439
1440                let fn_ops_cl = Arc::clone(fn_ops);
1441                let tx_stream = tx.clone();
1442                std::thread::spawn(move || {
1443                    let tx_cb = tx_stream.clone();
1444                    let on_stream: Box<dyn FnMut(&str) + Send> = Box::new(move |chunk: &str| {
1445                        let sse = format!("data: {}\n\n", chunk);
1446                        let _ = tx_cb.send(sse.into_bytes());
1447                    });
1448
1449                    let result = pylon_router::FnOps::call(
1450                        fn_ops_cl.as_ref(),
1451                        &fn_name,
1452                        args,
1453                        auth,
1454                        Some(on_stream),
1455                        None, // streaming /api/fn/:name never carries HTTP request metadata
1456                    );
1457                    match result {
1458                        Ok((value, _trace)) => {
1459                            let done = format!(
1460                                "event: result\ndata: {}\n\n",
1461                                serde_json::to_string(&value).unwrap_or_else(|_| "null".into())
1462                            );
1463                            let _ = tx_stream.send(done.into_bytes());
1464                        }
1465                        Err(e) => {
1466                            let err = format!(
1467                                "event: error\ndata: {}\n\n",
1468                                serde_json::json!({"code": e.code, "message": e.message})
1469                            );
1470                            let _ = tx_stream.send(err.into_bytes());
1471                        }
1472                    }
1473                });
1474
1475                let response = with_security_headers(Response::new(
1476                    tiny_http::StatusCode(200),
1477                    vec![
1478                        Header::from_bytes("Content-Type", "text/event-stream").unwrap(),
1479                        Header::from_bytes("Cache-Control", "no-cache").unwrap(),
1480                        Header::from_bytes("Connection", "keep-alive").unwrap(),
1481                        Header::from_bytes(
1482                            "Access-Control-Allow-Origin",
1483                            cors_origin.as_bytes().to_vec(),
1484                        )
1485                        .unwrap(),
1486                    ],
1487                    streaming_body,
1488                    None,
1489                    None,
1490                ));
1491                let _ = request.respond(response);
1492                mt.record_request("POST", 200);
1493                continue;
1494            }
1495        }
1496
1497        // --- POST /api/ai/stream — SSE streaming AI completion ---
1498        if url == "/api/ai/stream" && method == Method::Post {
1499            // AI endpoints spend real money per call. Require auth so a
1500            // drive-by caller can't burn through the provider budget.
1501            if auth_ctx.user_id.is_none() {
1502                let err = json_error(
1503                    "AUTH_REQUIRED",
1504                    "/api/ai/stream requires an authenticated session",
1505                );
1506                let response = with_security_headers(
1507                    Response::from_string(&err)
1508                        .with_status_code(401u16)
1509                        .with_header(
1510                            Header::from_bytes("Content-Type", "application/json").unwrap(),
1511                        )
1512                        .with_header(
1513                            Header::from_bytes(
1514                                "Access-Control-Allow-Origin",
1515                                cors_origin.as_bytes().to_vec(),
1516                            )
1517                            .unwrap(),
1518                        ),
1519                );
1520                let _ = request.respond(response);
1521                mt.record_request("POST", 401);
1522                continue;
1523            }
1524            let ai_provider = std::env::var("PYLON_AI_PROVIDER").unwrap_or_default();
1525            let ai_key = std::env::var("PYLON_AI_API_KEY").unwrap_or_default();
1526            let ai_model = std::env::var("PYLON_AI_MODEL").unwrap_or_default();
1527            let ai_base = std::env::var("PYLON_AI_BASE_URL").unwrap_or_default();
1528
1529            if ai_key.is_empty() && ai_provider != "custom" {
1530                let err = json_error(
1531                    "AI_NOT_CONFIGURED",
1532                    "Set PYLON_AI_PROVIDER and PYLON_AI_API_KEY",
1533                );
1534                let response = with_security_headers(
1535                    Response::from_string(&err)
1536                        .with_status_code(503u16)
1537                        .with_header(
1538                            Header::from_bytes("Content-Type", "application/json").unwrap(),
1539                        )
1540                        .with_header(
1541                            Header::from_bytes(
1542                                "Access-Control-Allow-Origin",
1543                                cors_origin.as_bytes().to_vec(),
1544                            )
1545                            .unwrap(),
1546                        ),
1547                );
1548                let _ = request.respond(response);
1549                mt.record_request("POST", 503);
1550                continue;
1551            }
1552
1553            let parsed: serde_json::Value = match serde_json::from_str(&body) {
1554                Ok(v) => v,
1555                Err(_) => {
1556                    let err = json_error("INVALID_JSON", "Invalid request body");
1557                    let response = with_security_headers(
1558                        Response::from_string(&err)
1559                            .with_status_code(400u16)
1560                            .with_header(
1561                                Header::from_bytes("Content-Type", "application/json").unwrap(),
1562                            )
1563                            .with_header(
1564                                Header::from_bytes(
1565                                    "Access-Control-Allow-Origin",
1566                                    cors_origin.as_bytes().to_vec(),
1567                                )
1568                                .unwrap(),
1569                            ),
1570                    );
1571                    let _ = request.respond(response);
1572                    mt.record_request("POST", 400);
1573                    continue;
1574                }
1575            };
1576
1577            let messages: Vec<AiMessage> = match parsed.get("messages").and_then(|m| m.as_array()) {
1578                Some(arr) => arr
1579                    .iter()
1580                    .filter_map(|m| {
1581                        let role = m.get("role")?.as_str()?.to_string();
1582                        let content = m.get("content")?.as_str()?.to_string();
1583                        Some(AiMessage { role, content })
1584                    })
1585                    .collect(),
1586                None => {
1587                    let err = json_error("MISSING_FIELD", "\"messages\" array is required");
1588                    let response = with_security_headers(
1589                        Response::from_string(&err)
1590                            .with_status_code(400u16)
1591                            .with_header(
1592                                Header::from_bytes("Content-Type", "application/json").unwrap(),
1593                            )
1594                            .with_header(
1595                                Header::from_bytes(
1596                                    "Access-Control-Allow-Origin",
1597                                    cors_origin.as_bytes().to_vec(),
1598                                )
1599                                .unwrap(),
1600                            ),
1601                    );
1602                    let _ = request.respond(response);
1603                    mt.record_request("POST", 400);
1604                    continue;
1605                }
1606            };
1607
1608            // Override model from request body if provided.
1609            let model = parsed
1610                .get("model")
1611                .and_then(|m| m.as_str())
1612                .map(|s| s.to_string())
1613                .unwrap_or(ai_model);
1614
1615            let proxy = match ai_provider.as_str() {
1616                "anthropic" => AiProxyPlugin::anthropic(&ai_key, &model),
1617                "openai" => AiProxyPlugin::openai(&ai_key, &model),
1618                "custom" => AiProxyPlugin::custom_with_model(&ai_base, &ai_key, &model),
1619                _ => AiProxyPlugin::openai(&ai_key, &model),
1620            };
1621
1622            // Set up a channel-based streaming body so tiny_http streams
1623            // data to the client as chunks arrive from the AI provider.
1624            let (tx, rx) = std::sync::mpsc::channel::<Vec<u8>>();
1625            let streaming_body = StreamingBody::new(rx);
1626
1627            // Spawn the provider request on a background thread. Each chunk
1628            // is formatted as an SSE event and pushed through the channel.
1629            std::thread::spawn(move || {
1630                let result = proxy.stream_completion(&messages, &mut |chunk| {
1631                    let sse = format!(
1632                        "data: {}
1633
1634",
1635                        serde_json::json!({
1636                            "choices": [{"index": 0, "delta": {"content": chunk}}]
1637                        })
1638                    );
1639                    let _ = tx.send(sse.into_bytes());
1640                });
1641
1642                // Send a final event indicating completion or error.
1643                match result {
1644                    Ok(_) => {
1645                        let _ = tx.send(
1646                            b"data: [DONE]
1647
1648"
1649                            .to_vec(),
1650                        );
1651                    }
1652                    Err(e) => {
1653                        let err_event = format!(
1654                            "data: {}
1655
1656",
1657                            serde_json::json!({"error": {"message": e, "type": "stream_error"}})
1658                        );
1659                        let _ = tx.send(err_event.into_bytes());
1660                    }
1661                }
1662                // tx is dropped here, which causes StreamingBody::read to return 0 (EOF).
1663            });
1664
1665            let response = with_security_headers(Response::new(
1666                tiny_http::StatusCode(200),
1667                vec![
1668                    Header::from_bytes("Content-Type", "text/event-stream").unwrap(),
1669                    Header::from_bytes("Cache-Control", "no-cache").unwrap(),
1670                    Header::from_bytes("Connection", "keep-alive").unwrap(),
1671                    Header::from_bytes(
1672                        "Access-Control-Allow-Origin",
1673                        cors_origin.as_bytes().to_vec(),
1674                    )
1675                    .unwrap(),
1676                ],
1677                streaming_body,
1678                None, // unknown content length = chunked transfer
1679                None,
1680            ));
1681            let _ = request.respond(response);
1682            mt.record_request("POST", 200);
1683            continue;
1684        }
1685
1686        // Studio route (returns HTML, not JSON).
1687        //
1688        // Privileged admin UI. It renders the full schema and lets the
1689        // operator run mutations against the data browser. In production we
1690        // require an admin token; in dev mode we leave it open so
1691        // `pylon dev` remains friction-free for the single-user case.
1692        //
1693        // Serving a WWW-Authenticate Basic realm isn't useful here because
1694        // admin auth is bearer-token based. Callers get a 401 and should
1695        // retry with `Authorization: Bearer <PYLON_ADMIN_TOKEN>`.
1696        let (status, response_body, content_type, is_studio, extra_headers) = if (url == "/studio"
1697            || url == "/studio/")
1698            && method == Method::Get
1699        {
1700            if !is_dev && !auth_ctx.is_admin {
1701                let body = json_error(
1702                    "AUTH_REQUIRED",
1703                    "/studio requires admin auth in production (set PYLON_ADMIN_TOKEN and pass it as Bearer)",
1704                );
1705                let response = with_security_headers(
1706                    Response::from_string(&body)
1707                        .with_status_code(401u16)
1708                        .with_header(
1709                            Header::from_bytes("Content-Type", "application/json").unwrap(),
1710                        )
1711                        .with_header(
1712                            Header::from_bytes(
1713                                "Access-Control-Allow-Origin",
1714                                cors_origin.as_bytes().to_vec(),
1715                            )
1716                            .unwrap(),
1717                        ),
1718                );
1719                let _ = request.respond(response);
1720                mt.record_request("GET", 401);
1721                continue;
1722            }
1723            // Derive the public base URL from the request's Host header +
1724            // X-Forwarded-Proto (Fly / any HTTPS terminator sets this).
1725            // Hardcoding `http://localhost:{port}` here meant the studio
1726            // HTML served from pylon-crm.fly.dev tried to fetch
1727            // http://localhost:4321/api/* from the browser, which CSP
1728            // rightly blocks.
1729            let host = request
1730                .headers()
1731                .iter()
1732                .find(|h| h.field.equiv("Host"))
1733                .map(|h| h.value.as_str().to_string())
1734                .unwrap_or_else(|| format!("localhost:{port}"));
1735            let scheme = request
1736                .headers()
1737                .iter()
1738                .find(|h| h.field.equiv("X-Forwarded-Proto"))
1739                .map(|h| h.value.as_str().to_string())
1740                .unwrap_or_else(|| "http".to_string());
1741            let base = format!("{scheme}://{host}");
1742            let html = pylon_studio_api::generate_studio_html(rt.manifest(), &base);
1743            (
1744                200u16,
1745                html,
1746                "text/html",
1747                true,
1748                Vec::<(String, String)>::new(),
1749            )
1750        } else {
1751            // Run plugin middleware with per-request metadata so rate-limit
1752            // plugins can bucket by peer IP (not just user id) when the
1753            // caller is anonymous.
1754            let meta = pylon_plugin::RequestMeta {
1755                peer_ip: peer_ip.as_str(),
1756            };
1757            if let Err(e) = pr.run_on_request_with_meta(method.as_str(), &url, &auth_ctx, &meta) {
1758                (
1759                    e.status,
1760                    json_error(&e.code, &e.message),
1761                    "application/json",
1762                    false,
1763                    Vec::new(),
1764                )
1765            } else if let Some((s, b)) =
1766                pr.try_handle_route(method.as_str(), &url, &body, &auth_ctx)
1767            {
1768                // Plugin handled the route.
1769                (s, b, "application/json", false, Vec::new())
1770            } else {
1771                let notifier = WsSseNotifier {
1772                    ws: Arc::clone(&wh),
1773                    sse: Arc::clone(&sh),
1774                };
1775                let openapi_gen = RuntimeOpenApiGenerator {
1776                    manifest: rt.manifest(),
1777                };
1778                let file_ops = LocalFileOps::new_default();
1779                let cache_adapter = CacheAdapter(Arc::clone(&ca));
1780                let pubsub_adapter = PubSubAdapter(Arc::clone(&ps));
1781                let email_adapter = EmailAdapter::from_env();
1782                let fn_ops: Option<&dyn pylon_router::FnOps> =
1783                    fn_ops_ref.as_deref().map(|f| f as &dyn pylon_router::FnOps);
1784                let shard_adapter = shards_ref.as_ref().map(|reg| ShardOpsAdapter {
1785                    registry: Arc::clone(reg),
1786                });
1787                let shard_ops: Option<&dyn pylon_router::ShardOps> = shard_adapter
1788                    .as_ref()
1789                    .map(|a| a as &dyn pylon_router::ShardOps);
1790                let plugin_hooks = PluginHooksAdapter(Arc::clone(&pr));
1791                // Snapshot request headers as (name, value) pairs for the
1792                // router to forward into webhook-invoked actions. Header
1793                // names are left as-sent; the router lowercases + merges
1794                // duplicates per RFC 7230 when constructing RequestInfo.
1795                let request_headers: Vec<(String, String)> = request
1796                    .headers()
1797                    .iter()
1798                    .map(|h| (h.field.as_str().to_string(), h.value.as_str().to_string()))
1799                    .collect();
1800                let router_ctx = pylon_router::RouterContext {
1801                    store: rt.as_ref(),
1802                    session_store: &ss,
1803                    magic_codes: &mc,
1804                    oauth_state: &os,
1805                    policy_engine: &pe,
1806                    change_log: &cl,
1807                    notifier: &notifier,
1808                    rooms: rm.as_ref(),
1809                    cache: &cache_adapter,
1810                    pubsub: &pubsub_adapter,
1811                    jobs: jq.as_ref(),
1812                    scheduler: sc.as_ref(),
1813                    workflows: we.as_ref(),
1814                    files: &file_ops,
1815                    openapi: &openapi_gen,
1816                    functions: fn_ops,
1817                    email: &email_adapter,
1818                    shards: shard_ops,
1819                    plugin_hooks: &plugin_hooks,
1820                    auth_ctx: &auth_ctx,
1821                    is_dev,
1822                    request_headers: &request_headers,
1823                    cookie_config: cookie_config.as_ref(),
1824                    response_headers: std::cell::RefCell::new(Vec::new()),
1825                };
1826                let http_method = HttpMethod::from_str(method.as_str());
1827                let (s, b, _ct) = pylon_router::route(
1828                    &router_ctx,
1829                    http_method,
1830                    &url,
1831                    &body,
1832                    auth_token.as_deref(),
1833                );
1834                let extra_headers = router_ctx.take_response_headers();
1835                (s, b, "application/json", false, extra_headers)
1836            }
1837        };
1838
1839        let mut response = Response::from_string(&response_body)
1840            .with_status_code(status)
1841            .with_header(Header::from_bytes("Content-Type", content_type).unwrap())
1842            .with_header(
1843                Header::from_bytes(
1844                    "Access-Control-Allow-Origin",
1845                    cors_origin.as_bytes().to_vec(),
1846                )
1847                .unwrap(),
1848            )
1849            .with_header(
1850                Header::from_bytes(
1851                    "Access-Control-Allow-Methods",
1852                    "GET, POST, PATCH, DELETE, OPTIONS",
1853                )
1854                .unwrap(),
1855            )
1856            .with_header(
1857                Header::from_bytes(
1858                    "Access-Control-Allow-Headers",
1859                    "Content-Type, Authorization",
1860                )
1861                .unwrap(),
1862            );
1863        // Cookie-based auth requires `Access-Control-Allow-Credentials:
1864        // true` on the response, paired with a specific origin. Vary
1865        // ensures intermediaries don't cache one origin's response and
1866        // serve it back to a different origin's browser.
1867        if allow_credentials {
1868            response = response
1869                .with_header(
1870                    Header::from_bytes("Access-Control-Allow-Credentials", "true").unwrap(),
1871                )
1872                .with_header(Header::from_bytes("Vary", "Origin").unwrap());
1873        }
1874
1875        // Apply any extra headers handlers attached via the router context
1876        // (Set-Cookie on login/logout, Location on OAuth GET callback).
1877        // Bytes from these headers come from server-built strings — bad
1878        // bytes here would be a programming bug, not request-driven, so a
1879        // failed Header::from_bytes is silently dropped rather than
1880        // poisoning the response.
1881        for (name, value) in extra_headers {
1882            if let Ok(h) = Header::from_bytes(name.as_bytes(), value.as_bytes().to_vec()) {
1883                response = response.with_header(h);
1884            }
1885        }
1886
1887        // Add Content-Security-Policy for Studio HTML responses.
1888        //
1889        // Studio talks to the same Rust process over HTTP (same origin)
1890        // AND a sibling WebSocket port (port+1, scheme ws:). CSP's
1891        // `default-src` covers `connect-src` by fallback, so any
1892        // directive we set there must include the WS scheme or the
1893        // browser silently blocks the live-sync connection.
1894        //
1895        // `ws:` + `wss:` cover localhost dev + TLS deploys without
1896        // hard-coding ports. Same-origin `'self'` keeps HTTP fetches
1897        // allowed. Inline + eval stay for the Tailwind/Babel CDN scripts
1898        // the current Studio HTML includes.
1899        if is_studio {
1900            response = response.with_header(
1901                Header::from_bytes(
1902                    "Content-Security-Policy",
1903                    "default-src 'self' 'unsafe-inline' 'unsafe-eval' https://cdn.tailwindcss.com https://unpkg.com ws: wss:",
1904                ).unwrap(),
1905            );
1906        }
1907
1908        let response = with_security_headers(response);
1909
1910        let _ = request.respond(response);
1911        mt.record_request(method.as_str(), status);
1912    }
1913
1914    tracing::warn!("Shutting down gracefully...");
1915
1916    // --- Drain phase ---
1917    // Stop accepting new work, let in-flight finish, close subsystems cleanly.
1918    let drain_timeout = std::time::Duration::from_secs(
1919        std::env::var("PYLON_DRAIN_SECS")
1920            .ok()
1921            .and_then(|s| s.parse().ok())
1922            .unwrap_or(10),
1923    );
1924    let start = Instant::now();
1925
1926    // Stop any running shards so their tick loops exit.
1927    if let Some(reg) = &shard_registry {
1928        for id in reg.ids() {
1929            if let Some(shard) = reg.get(&id) {
1930                shard.stop();
1931            }
1932        }
1933    }
1934
1935    // Let the scheduler finish its current cycle.
1936    let _ = &scheduler; // drop Arc at end of scope
1937
1938    // Wait for outstanding workers to idle, up to drain_timeout.
1939    while start.elapsed() < drain_timeout {
1940        let pending_jobs = job_queue.stats().pending;
1941        if pending_jobs == 0 {
1942            break;
1943        }
1944        std::thread::sleep(std::time::Duration::from_millis(100));
1945    }
1946
1947    let elapsed = start.elapsed();
1948    tracing::warn!(
1949        "Drain complete in {:.1}s (timeout {}s)",
1950        elapsed.as_secs_f32(),
1951        drain_timeout.as_secs()
1952    );
1953    Ok(())
1954}
1955
1956// The route() function has been extracted to the `pylon-router` crate.
1957// See `pylon_router::route()` for the platform-agnostic routing logic.
1958// The server now delegates to it via a `RouterContext`.
1959
1960fn json_error(code: &str, message: &str) -> String {
1961    pylon_router::json_error(code, message)
1962}
1963
1964/// Build the session store. Persists by default for file-backed runtimes —
1965/// sessions live in a sibling `<db>.sessions.db` file next to the app DB
1966/// unless `PYLON_SESSION_DB` overrides the path or
1967/// `PYLON_SESSION_IN_MEMORY=1` opts out. In-memory runtimes (tests)
1968/// get an in-memory session store.
1969///
1970/// This used to be opt-in, which silently broke every app after a server
1971/// restart: tokens in browser localStorage resolved to anonymous, pulls
1972/// came back empty under policy, and mutations 400'd with UNAUTHENTICATED.
1973fn build_session_store(app_db_path: Option<&str>) -> SessionStore {
1974    if std::env::var("PYLON_SESSION_IN_MEMORY")
1975        .map(|v| v == "1" || v == "true")
1976        .unwrap_or(false)
1977    {
1978        return SessionStore::new();
1979    }
1980    let explicit = std::env::var("PYLON_SESSION_DB").ok();
1981    let default_path = app_db_path.map(|p| format!("{p}.sessions.db"));
1982    let path = match explicit.or(default_path) {
1983        Some(p) => p,
1984        None => return SessionStore::new(),
1985    };
1986    match crate::session_backend::SqliteSessionBackend::open(&path) {
1987        Ok(backend) => {
1988            tracing::info!("[pylon] Session persistence enabled: {path}");
1989            SessionStore::with_backend(Box::new(backend))
1990        }
1991        Err(e) => {
1992            tracing::warn!(
1993                "[pylon] could not open session DB {path}: {e}. Falling back to in-memory sessions."
1994            );
1995            SessionStore::new()
1996        }
1997    }
1998}
1999
2000/// Parse a `multipart/form-data` body and return the first file part found.
2001///
2002/// Returns `(filename, content_type, bytes)` on success, `None` if the body
2003/// can't be parsed or no file part exists.
2004///
2005/// Handles the common RFC 7578 subset used by browsers and curl:
2006/// - `--boundary` part separators
2007/// - `Content-Disposition: form-data; name=...; filename=...`
2008/// - `Content-Type: ...`
2009/// - blank line, then raw bytes, then `\r\n--boundary` terminator
2010fn parse_multipart_first_file(
2011    body: &[u8],
2012    content_type_header: &str,
2013) -> Option<(String, String, Vec<u8>)> {
2014    // Extract the boundary parameter.
2015    let boundary_param = content_type_header
2016        .split(';')
2017        .find_map(|p| p.trim().strip_prefix("boundary="))?;
2018    let boundary = boundary_param.trim_matches('"');
2019    let delimiter = format!("--{boundary}");
2020    let delimiter_bytes = delimiter.as_bytes();
2021
2022    // Find each part between delimiters.
2023    let mut pos = 0usize;
2024    while pos < body.len() {
2025        // Find the next delimiter.
2026        let next = find_subslice(&body[pos..], delimiter_bytes)?;
2027        let part_start = pos + next + delimiter_bytes.len();
2028        // Skip CRLF or -- (terminator) after the delimiter.
2029        if part_start + 2 > body.len() {
2030            return None;
2031        }
2032        if &body[part_start..part_start + 2] == b"--" {
2033            return None; // end of parts, no file found
2034        }
2035        let header_start = part_start + skip_crlf(&body[part_start..]);
2036
2037        // Find end-of-headers (blank line).
2038        let header_end_offset = find_subslice(&body[header_start..], b"\r\n\r\n")?;
2039        let headers = &body[header_start..header_start + header_end_offset];
2040        let data_start = header_start + header_end_offset + 4;
2041
2042        // Find the next delimiter — that's where this part's data ends.
2043        let next_delim_offset = find_subslice(&body[data_start..], delimiter_bytes)?;
2044        // Strip the trailing CRLF before the delimiter.
2045        let mut data_end = data_start + next_delim_offset;
2046        if data_end >= 2 && &body[data_end - 2..data_end] == b"\r\n" {
2047            data_end -= 2;
2048        }
2049
2050        // Parse headers we care about.
2051        let headers_str = std::str::from_utf8(headers).ok()?;
2052        let mut filename: Option<String> = None;
2053        let mut part_ct = String::from("application/octet-stream");
2054        let mut has_file = false;
2055        for line in headers_str.split("\r\n") {
2056            let lower = line.to_ascii_lowercase();
2057            if let Some(rest) = lower.strip_prefix("content-disposition:") {
2058                if rest.contains("filename=") {
2059                    has_file = true;
2060                    // Extract filename="xxx"
2061                    if let Some(start) = line.find("filename=\"") {
2062                        let from = start + 10;
2063                        if let Some(end_offset) = line[from..].find('"') {
2064                            filename = Some(line[from..from + end_offset].to_string());
2065                        }
2066                    }
2067                }
2068            } else if let Some(rest) = lower.strip_prefix("content-type:") {
2069                part_ct = rest.trim().to_string();
2070            }
2071        }
2072
2073        if has_file {
2074            let name = filename.unwrap_or_else(|| "upload".into());
2075            return Some((name, part_ct, body[data_start..data_end].to_vec()));
2076        }
2077
2078        pos = data_end;
2079    }
2080    None
2081}
2082
2083fn find_subslice(haystack: &[u8], needle: &[u8]) -> Option<usize> {
2084    if needle.is_empty() || needle.len() > haystack.len() {
2085        return None;
2086    }
2087    haystack.windows(needle.len()).position(|w| w == needle)
2088}
2089
2090fn skip_crlf(buf: &[u8]) -> usize {
2091    if buf.len() >= 2 && &buf[0..2] == b"\r\n" {
2092        2
2093    } else if !buf.is_empty() && buf[0] == b'\n' {
2094        1
2095    } else {
2096        0
2097    }
2098}
2099
2100#[cfg(test)]
2101mod multipart_tests {
2102    use super::*;
2103
2104    #[test]
2105    fn parses_single_file() {
2106        let body = b"--bnd\r\n\
2107Content-Disposition: form-data; name=\"file\"; filename=\"hello.txt\"\r\n\
2108Content-Type: text/plain\r\n\
2109\r\n\
2110Hello world\r\n\
2111--bnd--\r\n";
2112        let ct = "multipart/form-data; boundary=bnd";
2113        let (name, content_type, bytes) = parse_multipart_first_file(body, ct).unwrap();
2114        assert_eq!(name, "hello.txt");
2115        assert_eq!(content_type, "text/plain");
2116        assert_eq!(bytes, b"Hello world");
2117    }
2118
2119    #[test]
2120    fn returns_none_without_file_part() {
2121        let body = b"--bnd\r\n\
2122Content-Disposition: form-data; name=\"field\"\r\n\
2123\r\n\
2124just text\r\n\
2125--bnd--\r\n";
2126        let ct = "multipart/form-data; boundary=bnd";
2127        assert!(parse_multipart_first_file(body, ct).is_none());
2128    }
2129
2130    #[test]
2131    fn returns_none_when_no_boundary() {
2132        assert!(parse_multipart_first_file(b"anything", "application/json").is_none());
2133    }
2134}