Skip to main content

pylon_runtime/
server.rs

1#[allow(unused_imports)]
2use std::io::Read;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5use std::time::Instant;
6
7use pylon_auth::SessionStore;
8use pylon_http::HttpMethod;
9use pylon_plugin::PluginRegistry;
10use pylon_policy::PolicyEngine;
11use pylon_sync::{ChangeKind, ChangeLog};
12use tiny_http::{Header, Method, Response, Server};
13
14use crate::datastore::{
15    CacheAdapter, EmailAdapter, LocalFileOps, PluginHooksAdapter, PubSubAdapter,
16    RuntimeOpenApiGenerator, ShardOpsAdapter, WsSseNotifier,
17};
18use crate::jobs::{JobQueue, JobResult, Worker};
19use crate::metrics::Metrics;
20use crate::pubsub::PubSubBroker;
21use crate::rate_limit::RateLimiter;
22use crate::rooms::RoomManager;
23use crate::scheduler::Scheduler;
24use crate::sse::SseHub;
25use crate::workflows::WorkflowEngine;
26use crate::ws::WsHub;
27use crate::Runtime;
28use pylon_plugin::builtin::ai_proxy::{AiMessage, AiProxyPlugin};
29use pylon_plugin::builtin::cache::CachePlugin;
30
31// ---------------------------------------------------------------------------
32// Streaming body — bridges mpsc::Receiver to std::io::Read for SSE responses
33// ---------------------------------------------------------------------------
34
35/// A streaming response body backed by an MPSC channel.
36///
37/// When used as the body of a `tiny_http::Response`, it causes the server to
38/// write data as it arrives through the channel. Dropping the sender closes
39/// the stream (EOF).
40struct StreamingBody {
41    rx: std::sync::mpsc::Receiver<Vec<u8>>,
42    buf: Vec<u8>,
43    pos: usize,
44}
45
46impl StreamingBody {
47    fn new(rx: std::sync::mpsc::Receiver<Vec<u8>>) -> Self {
48        Self {
49            rx,
50            buf: Vec::new(),
51            pos: 0,
52        }
53    }
54}
55
56impl std::io::Read for StreamingBody {
57    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
58        // Drain any leftover data from a previous recv that was larger than
59        // the caller's buffer.
60        if self.pos < self.buf.len() {
61            let remaining = &self.buf[self.pos..];
62            let n = remaining.len().min(buf.len());
63            buf[..n].copy_from_slice(&remaining[..n]);
64            self.pos += n;
65            if self.pos >= self.buf.len() {
66                self.buf.clear();
67                self.pos = 0;
68            }
69            return Ok(n);
70        }
71
72        // Block until the next chunk arrives or the sender is dropped.
73        match self.rx.recv() {
74            Ok(data) if data.is_empty() => Ok(0),
75            Ok(data) => {
76                let n = data.len().min(buf.len());
77                buf[..n].copy_from_slice(&data[..n]);
78                if n < data.len() {
79                    self.buf = data;
80                    self.pos = n;
81                }
82                Ok(n)
83            }
84            Err(_) => Ok(0), // Channel closed = EOF
85        }
86    }
87}
88
89/// Global shutdown flag. Set via `request_shutdown()` to trigger graceful exit.
90static SHUTDOWN: AtomicBool = AtomicBool::new(false);
91
92/// Request a graceful shutdown of the running server.
93///
94/// This sets the shutdown flag and, if a server handle has been stashed, calls
95/// `unblock()` to wake the request loop. Safe to call from any thread or signal
96/// handler.
97pub fn request_shutdown() {
98    SHUTDOWN.store(true, Ordering::SeqCst);
99    // If a server handle is available, unblock the request loop so it can
100    // observe the flag immediately rather than waiting for the next request.
101    if let Some(srv) = SERVER_HANDLE.get() {
102        srv.unblock();
103    }
104}
105
106/// Global handle to the `tiny_http::Server` so `request_shutdown()` can call
107/// `unblock()` without requiring callers to hold a reference.
108static SERVER_HANDLE: std::sync::OnceLock<Arc<Server>> = std::sync::OnceLock::new();
109
110// ---------------------------------------------------------------------------
111// Security headers
112// ---------------------------------------------------------------------------
113
114/// Resolve the real client IP behind `trust_proxy_hops` reverse
115/// proxies. Returns an owned String; empty when no IP can be
116/// determined (callers fall back to "anon" identity downstream).
117///
118/// `trust_proxy_hops == 0` is the safe default: we ignore XFF
119/// entirely and use the socket address. Set to N when N trusted
120/// proxies sit in front of Pylon — we take the Nth-from-the-right
121/// XFF entry, which is the address the closest trusted proxy
122/// observed. Honoring the leftmost (or just trusting the whole
123/// header) lets any caller spoof their source IP by sending an
124/// `X-Forwarded-For: 1.2.3.4` header themselves.
125fn resolve_client_ip(request: &tiny_http::Request, trust_proxy_hops: usize) -> String {
126    let socket_ip = request
127        .remote_addr()
128        .map(|a| a.ip().to_string())
129        .unwrap_or_default();
130    if trust_proxy_hops == 0 {
131        return socket_ip;
132    }
133    // tiny_http stores field names as AsciiStr; cast back to &str so
134    // we can do the case-insensitive compare RFC 7230 calls for.
135    let xff = request
136        .headers()
137        .iter()
138        .find(|h| {
139            h.field
140                .as_str()
141                .as_str()
142                .eq_ignore_ascii_case("X-Forwarded-For")
143        })
144        .map(|h| h.value.as_str().to_string());
145    let Some(xff) = xff else {
146        return socket_ip;
147    };
148    // XFF is "client, proxy1, proxy2" — the leftmost is whatever the
149    // first hop SAID was the client (untrusted), and each subsequent
150    // entry is what the next hop saw. With N trusted proxies, the
151    // Nth-from-right is the IP our closest trusted proxy verified.
152    let entries: Vec<&str> = xff.split(',').map(str::trim).collect();
153    if entries.len() < trust_proxy_hops {
154        // XFF doesn't have enough hops — operator misconfiguration
155        // or a request that bypassed the expected proxy chain.
156        // Fall back to socket IP rather than trusting whatever's
157        // there.
158        return socket_ip;
159    }
160    let candidate = entries[entries.len() - trust_proxy_hops];
161    // Validate it parses as an IP before using as a bucket key —
162    // garbage-in would let attackers poison the rate-limit map.
163    if candidate.parse::<std::net::IpAddr>().is_ok() {
164        candidate.to_string()
165    } else {
166        socket_ip
167    }
168}
169
170/// Common security headers applied to every response.
171///
172/// `Referrer-Policy` and `Permissions-Policy` are defense-in-depth.
173/// `Strict-Transport-Security` is intentionally NOT set here — Pylon
174/// is typically reached through a TLS-terminating proxy (Fly LB,
175/// CloudFront) that owns the HSTS decision; setting it from the
176/// origin would force every plaintext-loopback test deploy to fight
177/// the browser cache.
178fn security_headers() -> Vec<Header> {
179    vec![
180        Header::from_bytes("X-Content-Type-Options", "nosniff").unwrap(),
181        Header::from_bytes("X-Frame-Options", "DENY").unwrap(),
182        Header::from_bytes("X-XSS-Protection", "1; mode=block").unwrap(),
183        // Don't leak the full URL to cross-origin destinations on
184        // navigation; same-origin still gets the path so internal
185        // analytics keep working.
186        Header::from_bytes("Referrer-Policy", "strict-origin-when-cross-origin").unwrap(),
187        // Deny every powerful browser API by default. Apps that need
188        // camera/mic/geolocation override per-route via their own
189        // Permissions-Policy header.
190        Header::from_bytes(
191            "Permissions-Policy",
192            "accelerometer=(), camera=(), geolocation=(), gyroscope=(), microphone=(), payment=(), usb=()",
193        )
194        .unwrap(),
195    ]
196}
197
198/// Add security headers to a response.
199fn with_security_headers<R: std::io::Read>(response: Response<R>) -> Response<R> {
200    let mut resp = response;
201    for header in security_headers() {
202        resp = resp.with_header(header);
203    }
204    resp
205}
206
207/// Start the dev server on the given port. Blocks until shutdown.
208pub fn start(runtime: Arc<Runtime>, port: u16) -> Result<(), String> {
209    start_with_plugins(runtime, port, None)
210}
211
212/// Start the dev server with optional plugins. Blocks until shutdown.
213pub fn start_with_plugins(
214    runtime: Arc<Runtime>,
215    port: u16,
216    plugins: Option<Arc<PluginRegistry>>,
217) -> Result<(), String> {
218    start_server(runtime, port, plugins, None)
219}
220
221/// Start the dev server with plugins and a shard registry for real-time
222/// simulations (games, MMO zones, etc.). Blocks until shutdown.
223pub fn start_with_shards(
224    runtime: Arc<Runtime>,
225    port: u16,
226    plugins: Option<Arc<PluginRegistry>>,
227    shard_registry: Arc<dyn pylon_realtime::DynShardRegistry>,
228) -> Result<(), String> {
229    start_server(runtime, port, plugins, Some(shard_registry))
230}
231
232fn start_server(
233    runtime: Arc<Runtime>,
234    port: u16,
235    plugins: Option<Arc<PluginRegistry>>,
236    shard_registry: Option<Arc<dyn pylon_realtime::DynShardRegistry>>,
237) -> Result<(), String> {
238    // Run the tracing-exporter hook BEFORE anything else emits spans. The
239    // operator registers it via `pylon_observability::set_tracing_hook`
240    // at process init; here we invoke it exactly once on startup. No-op
241    // if nothing was registered.
242    pylon_observability::run_tracing_hook();
243
244    let addr = format!("0.0.0.0:{port}");
245    let server = Server::http(&addr).map_err(|e| format!("Failed to start server: {e}"))?;
246    let server = Arc::new(server);
247
248    // Stash a handle so `request_shutdown()` can unblock the loop.
249    let _ = SERVER_HANDLE.set(Arc::clone(&server));
250
251    let session_store = Arc::new(build_session_store(runtime.db_path().as_deref()));
252    let magic_codes = Arc::new(pylon_auth::MagicCodeStore::new());
253    // Persist OAuth state if a session DB is configured. Reuse the same path
254    // (sessions and OAuth state are both small auth artifacts that should
255    // outlive a restart). Falls back to in-memory if no DB path is set.
256    let oauth_state = Arc::new(match std::env::var("PYLON_SESSION_DB").ok() {
257        Some(path) => match crate::oauth_backend::SqliteOAuthBackend::open(&path) {
258            Ok(backend) => pylon_auth::OAuthStateStore::with_backend(Box::new(backend)),
259            Err(e) => {
260                tracing::warn!(
261                    "[auth] OAuth state SQLite backend unavailable: {e} — falling back to in-memory"
262                );
263                pylon_auth::OAuthStateStore::new()
264            }
265        },
266        None => pylon_auth::OAuthStateStore::new(),
267    });
268    let policy_engine = Arc::new(PolicyEngine::from_manifest(runtime.manifest()));
269    let change_log = Arc::new(ChangeLog::new());
270
271    // Seed the change log with one synthetic insert per extant row so that
272    // a pull from seq=0 after a restart reconstructs current state. The
273    // change log is in-memory — restarting the process without this would
274    // leave SQLite rows unreachable via /api/sync/pull (clients would
275    // pull nothing and see an empty replica). Seqs here are fresh; clients
276    // whose cursors are ahead of `self.seq` get a 410 and full resync,
277    // which then hits this seeded log and gets every current row back.
278    for entity in runtime.manifest().entities.iter() {
279        match runtime.list(&entity.name) {
280            Ok(rows) => {
281                for row in rows {
282                    if let Some(id) = row.get("id").and_then(|v| v.as_str()) {
283                        change_log.append(&entity.name, id, ChangeKind::Insert, Some(row.clone()));
284                    }
285                }
286            }
287            Err(_) => {
288                // Entity table may not exist yet on first boot — skip.
289            }
290        }
291    }
292    let ws_hub = WsHub::new();
293    let sse_hub = SseHub::new();
294    // Default-register the rate-limit plugin when no custom registry was
295    // supplied. Without this, self-hosted deployments would launch with
296    // auth endpoints (/api/auth/magic/send, /api/auth/magic/verify,
297    // /api/auth/session) wide open to brute force and enumeration.
298    //
299    // Dev: 100k/min so a React app's initial bundle + auth + sync pulls
300    // (each worth ~6-10 requests) doesn't immediately 429 the dev. Prod:
301    // 100/min per IP — tight enough to crush burst attackers, loose
302    // enough for legitimate multi-tab UIs. Callers passing their own
303    // registry are responsible for their own limits.
304    // Probe dev mode NOW — defined for real at line ~300 but plugin
305    // registration below needs it. Same env-var, same logic.
306    let is_dev_early = std::env::var("PYLON_DEV_MODE")
307        .map(|v| v == "1" || v == "true")
308        .unwrap_or(true);
309    let plugin_rl_max: u32 = if is_dev_early { 100_000 } else { 100 };
310    let plugin_reg: Arc<PluginRegistry> = plugins.unwrap_or_else(|| {
311        let mut reg = PluginRegistry::new(runtime.manifest().clone());
312        reg.register(Arc::new(
313            pylon_plugin::builtin::rate_limit::RateLimitPlugin::new(
314                plugin_rl_max,
315                std::time::Duration::from_secs(60),
316            ),
317        ));
318        // Auto-scope any entity that declares a `tenantId` field. This is
319        // how multi-tenant isolation becomes a default posture rather than
320        // an opt-in: drop the field on the entity and the plugin takes it
321        // from there (stamps inserts, rejects cross-tenant writes).
322        reg.register(Arc::new(
323            pylon_plugin::builtin::tenant_scope::TenantScopePlugin::from_manifest(
324                runtime.manifest(),
325            ),
326        ));
327        Arc::new(reg)
328    });
329    let room_mgr = Arc::new(RoomManager::new(120)); // 2 min idle timeout
330    let ws_port = port + 1;
331    let sse_port = port + 2;
332
333    // Record server start time for the health endpoint.
334    let start_time = Instant::now();
335
336    let metrics = Arc::new(Metrics::new());
337
338    // Cache and pub/sub shared instances.
339    let cache = Arc::new(CachePlugin::new(100_000));
340    let pubsub_broker = Arc::new(PubSubBroker::new(100));
341
342    // Job queue, scheduler, and background workers.
343    let job_queue = Arc::new(JobQueue::new(1000));
344
345    // Persistent job store. Colocate with the app DB so `./app.db` gets
346    // `./app.db.jobs.db` automatically — otherwise jobs land in CWD, which
347    // is wherever the server was launched from (confusing and fragile).
348    // In-memory runtimes and the `PYLON_JOBS_IN_MEMORY=1` opt-out both
349    // skip persistence.
350    let jobs_in_memory = std::env::var("PYLON_JOBS_IN_MEMORY")
351        .map(|v| v == "1" || v == "true")
352        .unwrap_or(false);
353    if !jobs_in_memory {
354        let jobs_db_path = std::env::var("PYLON_JOBS_DB").ok().unwrap_or_else(|| {
355            runtime
356                .db_path()
357                .map(|p| format!("{p}.jobs.db"))
358                .unwrap_or_else(|| "pylon.jobs.db".into())
359        });
360        match crate::job_store::JobStore::open(&jobs_db_path) {
361            Ok(store) => {
362                let store = Arc::new(store);
363                let restored = job_queue.restore_from(&store);
364                if restored > 0 {
365                    tracing::info!("[jobs] Restored {restored} pending job(s) from {jobs_db_path}");
366                }
367                job_queue.attach_store(store);
368            }
369            Err(e) => {
370                tracing::warn!(
371                    "[jobs] Could not open job store at {jobs_db_path}: {e} — running without persistence"
372                );
373            }
374        }
375    }
376
377    // Register built-in framework jobs.
378    {
379        let cache_ref = Arc::clone(&cache);
380        job_queue.register(
381            "pylon.cache.cleanup",
382            Arc::new(move |_job| {
383                cache_ref.cleanup_expired();
384                JobResult::Success
385            }),
386        );
387        let rooms_ref = Arc::clone(&room_mgr);
388        job_queue.register(
389            "pylon.rooms.cleanup",
390            Arc::new(move |_job| {
391                rooms_ref.cleanup_idle();
392                JobResult::Success
393            }),
394        );
395    }
396
397    let scheduler = Arc::new(Scheduler::new(Arc::clone(&job_queue)));
398    // Schedule built-in tasks.
399    let _ = scheduler.schedule(
400        "pylon.cache.cleanup",
401        "*/10 * * * *",
402        Arc::new(|_| JobResult::Success),
403    );
404    let _ = scheduler.schedule(
405        "pylon.rooms.cleanup",
406        "*/5 * * * *",
407        Arc::new(|_| JobResult::Success),
408    );
409
410    // Start 2 background workers.
411    let _worker_handles: Vec<_> = (0..2)
412        .map(|i| {
413            let w = Worker::new(Arc::clone(&job_queue), &format!("worker-{i}"));
414            w.start()
415        })
416        .collect();
417
418    // Start the scheduler.
419    let _scheduler_handle = Arc::clone(&scheduler).start();
420
421    // Workflow engine: TS runner URL configurable via env, defaults to local Bun server.
422    let wf_runner_url = std::env::var("PYLON_WORKFLOW_RUNNER_URL")
423        .unwrap_or_else(|_| "http://127.0.0.1:9876/run".to_string());
424    let workflow_engine = Arc::new(WorkflowEngine::new(&wf_runner_url, 10_000));
425
426    // Rate limiter: per-IP outer cap on total requests.
427    //
428    // Defaults:
429    //   - Dev mode: effectively off (100k/min) so a React app's initial
430    //     bundle load + sync pulls + user clicks don't immediately 429.
431    //     100/min blew through during a single login + first sync pull.
432    //   - Prod: 600/min (10 req/sec average). Still tight, but a real app
433    //     should override with PYLON_RATE_LIMIT_MAX anyway.
434    //
435    // Override with PYLON_RATE_LIMIT_MAX + PYLON_RATE_LIMIT_WINDOW.
436    let default_rl_max = if is_dev_early { 100_000 } else { 600 };
437    let rl_max: u32 = std::env::var("PYLON_RATE_LIMIT_MAX")
438        .ok()
439        .and_then(|v| v.parse().ok())
440        .unwrap_or(default_rl_max);
441    let rl_window: u64 = std::env::var("PYLON_RATE_LIMIT_WINDOW")
442        .ok()
443        .and_then(|v| v.parse().ok())
444        .unwrap_or(60);
445    let rate_limiter = Arc::new(RateLimiter::new(rl_max, rl_window));
446
447    // Per-function rate limiter: separate bucket per (caller, function) pair.
448    // Defaults to a stricter cap because functions are heavier than reads.
449    // Override via PYLON_FN_RATE_LIMIT_MAX / PYLON_FN_RATE_LIMIT_WINDOW.
450    let fn_rl_max: u32 = std::env::var("PYLON_FN_RATE_LIMIT_MAX")
451        .ok()
452        .and_then(|v| v.parse().ok())
453        .unwrap_or(30);
454    let fn_rl_window: u64 = std::env::var("PYLON_FN_RATE_LIMIT_WINDOW")
455        .ok()
456        .and_then(|v| v.parse().ok())
457        .unwrap_or(60);
458    let fn_rate_limiter = Arc::new(RateLimiter::new(fn_rl_max, fn_rl_window));
459
460    // TypeScript function runtime: optional Bun process that loads functions/*.ts
461    // If no `functions/` directory exists or Bun isn't installed, this is None
462    // and /api/fn/* routes return 503.
463    // Build a notifier adapter so function mutations (`ctx.db.insert/update/delete`)
464    // emit change events to WS + SSE subscribers on COMMIT. Without this,
465    // functions write to the DB but sync clients never see the update
466    // live — they only catch up on the next refetch.
467    let fn_notifier: Arc<dyn pylon_router::ChangeNotifier> =
468        Arc::new(crate::datastore::WsSseNotifier {
469            ws: Arc::clone(&ws_hub),
470            sse: Arc::clone(&sse_hub),
471        });
472    let fn_ops_maybe = crate::datastore::try_spawn_functions(
473        Arc::clone(&runtime),
474        Arc::clone(&job_queue),
475        Arc::clone(&fn_rate_limiter),
476        Arc::clone(&change_log),
477        fn_notifier,
478    );
479
480    // Dev mode flag. Gates a *lot* of permissive behavior: magic codes
481    // appear in JSON responses, /studio is open without admin auth,
482    // POST /api/auth/session can mint sessions for arbitrary user_ids,
483    // OAuth callback accepts a caller-supplied email, CORS defaults to
484    // `*`, etc. Defaulting to `true` meant a prod deploy that simply
485    // forgot the env var was trivially compromisable — flip to safe-
486    // by-default and let the CLI's `pylon dev` opt in explicitly.
487    let is_dev = std::env::var("PYLON_DEV_MODE")
488        .map(|v| v == "1" || v == "true")
489        .unwrap_or(false);
490
491    // CORS origin. Defaults to `*` in dev for convenience; in prod we refuse
492    // to start with a wildcard because the server sends `Access-Control-
493    // Allow-Credentials: true` elsewhere and also accepts `Authorization:
494    // Bearer <session>`. The combination of `*` + credentials is a spec
495    // violation that some browsers tolerate, and even when they don't it
496    // lets any origin drive bearer-auth APIs.
497    let cors_origin = match std::env::var("PYLON_CORS_ORIGIN") {
498        Ok(v) => v,
499        Err(_) if is_dev => "*".to_string(),
500        Err(_) => {
501            return Err(
502                "PYLON_CORS_ORIGIN must be set in production (non-dev mode). \
503                Set it to your frontend's origin, or set PYLON_DEV_MODE=true \
504                for local development."
505                    .into(),
506            );
507        }
508    };
509    if !is_dev && cors_origin == "*" {
510        return Err("PYLON_CORS_ORIGIN=\"*\" is refused in production mode. \
511            Set it to an explicit origin (https://app.example.com)."
512            .into());
513    }
514    // Browsers forbid combining `Access-Control-Allow-Origin: *` with
515    // `Access-Control-Allow-Credentials: true`. Cookie-based auth needs
516    // credentials, so we only emit the credentials header when the origin
517    // is specific. In dev with `*` we lose cookies-from-cross-origin
518    // (acceptable: dev typically uses same-origin proxying), but we
519    // refuse to send a header combo browsers will reject either way.
520    let allow_credentials = cors_origin != "*";
521    // Validate the origin once so per-request header construction can never
522    // panic on bad bytes. Previously every `Header::from_bytes(...).unwrap()`
523    // was a potential request-triggered DoS via env misconfiguration.
524    if Header::from_bytes(
525        "Access-Control-Allow-Origin",
526        cors_origin.as_bytes().to_vec(),
527    )
528    .is_err()
529    {
530        return Err(format!(
531            "PYLON_CORS_ORIGIN={cors_origin:?} contains bytes that are not a valid HTTP header value"
532        ));
533    }
534
535    // Admin token: read once at startup, not per-request.
536    let admin_token: Option<String> = std::env::var("PYLON_ADMIN_TOKEN").ok();
537
538    // Trusted proxy hops for resolving the real client IP behind a
539    // reverse proxy (Fly LB, nginx, CloudFront, etc.). Default 0 =
540    // ignore X-Forwarded-For and use the socket peer (safe-by-default;
541    // an unconfigured prod deploy can't be tricked into trusting
542    // attacker-supplied XFF). Set to N when there are exactly N
543    // trusted proxies in front of Pylon — the resolver takes the
544    // Nth-from-the-right address in XFF, which is the IP the closest
545    // trusted proxy actually saw the request from. Without this, every
546    // unauth caller behind the proxy shares one rate-limit bucket.
547    let trust_proxy_hops: usize = std::env::var("PYLON_TRUST_PROXY_HOPS")
548        .ok()
549        .and_then(|v| v.parse().ok())
550        .unwrap_or(0);
551
552    // Session cookie config — built once. Cookie name defaults to
553    // `${app_name}_session` so multiple Pylon apps on the same parent
554    // domain don't clobber each other's cookies. Browsers receive an
555    // HttpOnly+Secure+SameSite=Lax cookie by default; the same opaque
556    // session token continues to work via `Authorization: Bearer …`
557    // for CLI / mobile / server-to-server callers.
558    let cookie_config = Arc::new({
559        let app_name = runtime.manifest().name.as_str();
560        pylon_auth::CookieConfig::from_env(&pylon_auth::CookieConfig::default_name_for(app_name))
561    });
562
563    // CSRF protection. Enforced inline at the HTTP layer because the plugin
564    // trait's `on_request` hook doesn't see request headers. For
565    // state-changing methods (POST/PATCH/PUT/DELETE) we check Origin, then
566    // Referer, against the allowlist.
567    //
568    // Allowlist resolution:
569    //   - PYLON_CSRF_ORIGINS (comma-separated) if set
570    //   - otherwise PYLON_CORS_ORIGIN (already validated above)
571    //   - in dev, fall back to allow-any to avoid breaking local tooling.
572    let csrf_origins: Vec<String> = match std::env::var("PYLON_CSRF_ORIGINS") {
573        Ok(v) => v
574            .split(',')
575            .map(|s| s.trim().to_string())
576            .filter(|s| !s.is_empty())
577            .collect(),
578        Err(_) => {
579            if is_dev {
580                vec!["*".to_string()]
581            } else if cors_origin != "*" {
582                vec![cors_origin.clone()]
583            } else {
584                // Non-dev + wildcard was already rejected, but guard anyway.
585                vec![]
586            }
587        }
588    };
589    let csrf = Arc::new(pylon_plugin::builtin::csrf::CsrfPlugin::new(csrf_origins));
590
591    // Start WebSocket server on port+1.
592    //
593    // The snapshot fetcher gives the WS reader a way to ship the current
594    // CRDT snapshot to a client the instant it subscribes — without it
595    // the new tab would have to wait for the next write before catching
596    // up to the converged state. Encodes into the same length-prefixed
597    // wire frame as the broadcast path so the client decoder is shared.
598    //
599    // Authz is enforced HERE, not at the WS layer: the closure runs the
600    // row through `check_entity_read` against the caller's auth ctx and
601    // returns None on deny. The caller (handle_crdt_control) treats
602    // None as "don't subscribe" — so a denied client can't sit on a
603    // subscription waiting for a future write to leak state.
604    {
605        let hub = Arc::clone(&ws_hub);
606        let sessions = Arc::clone(&session_store);
607        let runtime_for_fetcher = Arc::clone(&runtime);
608        let pe_for_fetcher = Arc::clone(&policy_engine);
609        let fetcher: crate::ws::SnapshotFetcher = Arc::new(move |auth_ctx, entity, row_id| {
610            use pylon_http::DataStore;
611            // Fetch the row first so the policy engine can evaluate
612            // row-level predicates (`data.authorId == auth.userId`
613            // etc). Missing row → deny silently; the client just
614            // never gets a frame and can't probe existence.
615            let row = match runtime_for_fetcher.get_by_id(entity, row_id) {
616                Ok(Some(v)) => v,
617                _ => return None,
618            };
619            if !matches!(
620                pe_for_fetcher.check_entity_read(entity, auth_ctx, Some(&row)),
621                pylon_policy::PolicyResult::Allowed
622            ) {
623                return None;
624            }
625            let snap = match runtime_for_fetcher.crdt_snapshot(entity, row_id) {
626                Ok(Some(bytes)) => bytes,
627                _ => return None,
628            };
629            pylon_router::encode_crdt_frame(
630                pylon_router::CRDT_FRAME_SNAPSHOT,
631                entity,
632                row_id,
633                &snap,
634            )
635            .ok()
636        });
637        std::thread::spawn(move || {
638            crate::ws::start_ws_server(hub, sessions, ws_port, Some(fetcher));
639        });
640    }
641
642    // Start SSE server on port+2.
643    {
644        let hub = Arc::clone(&sse_hub);
645        std::thread::spawn(move || {
646            crate::sse::start_sse_server(hub, sse_port);
647        });
648    }
649
650    // Start shard WebSocket server on port+3 when a registry is provided.
651    let shard_ws_port = port + 3;
652    if let Some(reg) = shard_registry.clone() {
653        let sessions = Arc::clone(&session_store);
654        std::thread::spawn(move || {
655            crate::shard_ws::start_shard_ws_server(reg, sessions, shard_ws_port);
656        });
657    }
658
659    tracing::warn!("pylon dev server listening on http://localhost:{port}");
660    tracing::info!("  WebSocket: ws://localhost:{ws_port}");
661    tracing::info!("  Studio: http://localhost:{port}/studio");
662    tracing::info!("  API:    http://localhost:{port}/api/entities/<entity>");
663    tracing::info!("  Auth:   http://localhost:{port}/api/auth/session");
664
665    // Use recv() in a loop instead of incoming_requests() so we can share
666    // the Arc<Server> with the shutdown path (incoming_requests borrows &self
667    // which prevents moving the Arc into another thread).
668    loop {
669        if SHUTDOWN.load(Ordering::Relaxed) {
670            break;
671        }
672
673        let mut request = match server.recv() {
674            Ok(rq) => rq,
675            Err(_) => {
676                // recv() returns Err when unblocked or the socket is closed.
677                break;
678            }
679        };
680
681        if SHUTDOWN.load(Ordering::Relaxed) {
682            break;
683        }
684
685        let rt = Arc::clone(&runtime);
686        let ss = Arc::clone(&session_store);
687        let pe = Arc::clone(&policy_engine);
688        let cl = Arc::clone(&change_log);
689        let wh = Arc::clone(&ws_hub);
690        let sh = Arc::clone(&sse_hub);
691        let mc = Arc::clone(&magic_codes);
692        let pr = Arc::clone(&plugin_reg);
693        let rm = Arc::clone(&room_mgr);
694        let mt = Arc::clone(&metrics);
695        let os = Arc::clone(&oauth_state);
696        let ca = Arc::clone(&cache);
697        let ps = Arc::clone(&pubsub_broker);
698        let jq = Arc::clone(&job_queue);
699        let sc = Arc::clone(&scheduler);
700        let we = Arc::clone(&workflow_engine);
701        let fn_ops_ref = fn_ops_maybe.clone();
702        let shards_ref = shard_registry.clone();
703        let cors_origin = cors_origin.clone();
704        let cookie_config = Arc::clone(&cookie_config);
705        let allow_credentials = allow_credentials;
706        let is_dev = is_dev;
707
708        let method = request.method().clone();
709        let url = request.url().to_string();
710
711        // --- Health check: fast path before auth or body parsing ---
712        if url == "/health" && method == Method::Get {
713            let uptime = start_time.elapsed().as_secs();
714            let body = serde_json::json!({
715                "status": "ok",
716                "version": "0.1.0",
717                "uptime_secs": uptime,
718            })
719            .to_string();
720
721            let response = with_security_headers(
722                Response::from_string(&body)
723                    .with_status_code(200u16)
724                    .with_header(Header::from_bytes("Content-Type", "application/json").unwrap())
725                    .with_header(
726                        Header::from_bytes(
727                            "Access-Control-Allow-Origin",
728                            cors_origin.as_bytes().to_vec(),
729                        )
730                        .unwrap(),
731                    ),
732            );
733            let _ = request.respond(response);
734            continue;
735        }
736
737        // --- Metrics endpoint: fast path before rate-limit / body parsing.
738        // Gate behind admin auth in non-dev to prevent leakage of function
739        // names, request volumes, and error rates to the public internet.
740        // Dev mode stays open so local Prometheus scrapers just work.
741        if url == "/metrics" && method == Method::Get {
742            if !is_dev {
743                let admin_bytes = admin_token.as_deref().unwrap_or("").as_bytes();
744                let auth_ok = !admin_bytes.is_empty()
745                    && request.headers().iter().any(|h| {
746                        let name = h.field.as_str().as_str();
747                        name.eq_ignore_ascii_case("Authorization")
748                            && h.value
749                                .as_str()
750                                .strip_prefix("Bearer ")
751                                .map(|t| pylon_auth::constant_time_eq(t.as_bytes(), admin_bytes))
752                                .unwrap_or(false)
753                    });
754                if !auth_ok {
755                    let body = json_error(
756                        "UNAUTHORIZED",
757                        "/metrics requires admin bearer token in non-dev mode",
758                    );
759                    let response = with_security_headers(
760                        Response::from_string(&body)
761                            .with_status_code(401u16)
762                            .with_header(
763                                Header::from_bytes("Content-Type", "application/json").unwrap(),
764                            ),
765                    );
766                    let _ = request.respond(response);
767                    continue;
768                }
769            }
770            let prefers_prometheus = request.headers().iter().any(|h| {
771                (h.field.as_str() == "Accept" || h.field.as_str() == "accept")
772                    && (h.value.as_str().contains("text/plain")
773                        || h.value.as_str().contains("application/openmetrics-text"))
774            });
775            let (body, content_type) = if prefers_prometheus {
776                (mt.prometheus(), "text/plain; version=0.0.4")
777            } else {
778                (mt.snapshot().to_string(), "application/json")
779            };
780            let response = with_security_headers(
781                Response::from_string(&body)
782                    .with_status_code(200u16)
783                    .with_header(Header::from_bytes("Content-Type", content_type).unwrap())
784                    .with_header(
785                        Header::from_bytes(
786                            "Access-Control-Allow-Origin",
787                            cors_origin.as_bytes().to_vec(),
788                        )
789                        .unwrap(),
790                    ),
791            );
792            let _ = request.respond(response);
793            mt.record_request("GET", 200);
794            continue;
795        }
796
797        // --- Rate limiting: check per-IP request count ---
798        // peer_ip honors PYLON_TRUST_PROXY_HOPS so a deploy behind a
799        // load balancer (Fly, nginx, CloudFront) gets per-client
800        // limiting instead of putting every request through one
801        // bucket keyed by the proxy's IP.
802        let peer_ip = resolve_client_ip(&request, trust_proxy_hops);
803
804        // OPTIONS preflights are browser infrastructure, not user intent.
805        // Rate-limiting them makes a normal page effectively halve its
806        // budget (preflight + real request per call) and returns a 429
807        // that the browser can't interpret as a valid CORS response —
808        // the user-visible symptom is "Failed to fetch" on login. Skip.
809        let is_preflight = matches!(method, Method::Options);
810        if !is_preflight {
811            if let Err(retry_after) = rate_limiter.check(&peer_ip) {
812                let err_body = json_error(
813                    "RATE_LIMITED",
814                    &format!("Too many requests. Retry after {retry_after} seconds."),
815                );
816                let response = with_security_headers(
817                    Response::from_string(&err_body)
818                        .with_status_code(429u16)
819                        .with_header(
820                            Header::from_bytes("Content-Type", "application/json").unwrap(),
821                        )
822                        .with_header(
823                            Header::from_bytes(
824                                "Access-Control-Allow-Origin",
825                                cors_origin.as_bytes().to_vec(),
826                            )
827                            .unwrap(),
828                        )
829                        .with_header(
830                            Header::from_bytes(
831                                "Access-Control-Allow-Methods",
832                                "GET, POST, PATCH, DELETE, OPTIONS",
833                            )
834                            .unwrap(),
835                        )
836                        .with_header(
837                            Header::from_bytes(
838                                "Access-Control-Allow-Headers",
839                                "Content-Type, Authorization",
840                            )
841                            .unwrap(),
842                        )
843                        .with_header(
844                            Header::from_bytes(
845                                "Retry-After",
846                                retry_after.to_string().as_bytes().to_vec(),
847                            )
848                            .unwrap(),
849                        ),
850                );
851                let _ = request.respond(response);
852                mt.record_request(method.as_str(), 429);
853                continue;
854            }
855        } // end: if !is_preflight
856
857        // --- CSRF check on state-changing requests ---
858        //
859        // Browsers forbid cross-origin POST/PATCH/PUT/DELETE unless CORS
860        // allows it, but an attacker controlling another origin can still
861        // ship credentials-bearing requests if the server is permissive.
862        // The CSRF plugin validates Origin (then Referer) against an explicit
863        // allowlist — this is the check that was missing because the Plugin
864        // trait's `on_request` hook has no access to headers.
865        //
866        // The Authorization header carries bearer tokens, so CSRF mostly
867        // matters for cookie-based sessions — but we enforce globally: a
868        // request that misses Origin/Referer on a state-changing method is
869        // rejected, which is the safer default.
870        {
871            let method_str = method.as_str();
872            let is_bearer = request.headers().iter().any(|h| {
873                (h.field.as_str() == "Authorization" || h.field.as_str() == "authorization")
874                    && h.value.as_str().starts_with("Bearer ")
875            });
876            // Bearer-authenticated requests are not CSRF-vulnerable in the
877            // classic sense — browsers don't auto-attach bearer tokens. Skip
878            // the check for them so server-to-server API callers keep working
879            // without needing Origin headers.
880            if !is_bearer && !matches!(method, Method::Get | Method::Head | Method::Options) {
881                let origin = request
882                    .headers()
883                    .iter()
884                    .find(|h| h.field.as_str() == "Origin" || h.field.as_str() == "origin")
885                    .map(|h| h.value.as_str().to_string());
886                let referer = request
887                    .headers()
888                    .iter()
889                    .find(|h| h.field.as_str() == "Referer" || h.field.as_str() == "referer")
890                    .map(|h| h.value.as_str().to_string());
891                if let Err(err) = csrf.check(method_str, origin.as_deref(), referer.as_deref()) {
892                    let body = json_error(&err.code, &err.message);
893                    let response = with_security_headers(
894                        Response::from_string(&body)
895                            .with_status_code(err.status)
896                            .with_header(
897                                Header::from_bytes("Content-Type", "application/json").unwrap(),
898                            )
899                            .with_header(
900                                Header::from_bytes(
901                                    "Access-Control-Allow-Origin",
902                                    cors_origin.as_bytes().to_vec(),
903                                )
904                                .unwrap(),
905                            ),
906                    );
907                    let _ = request.respond(response);
908                    mt.record_request(method_str, err.status);
909                    continue;
910                }
911            }
912        }
913
914        // Extract auth token + auth context EARLY so every fast path (upload,
915        // shard SSE, fn streaming, AI streaming) can enforce auth the same
916        // way the router does. Previously these paths ran before auth
917        // extraction and bypassed the plugin/router auth chain entirely.
918        //
919        // Two transports for the same opaque session token:
920        //   1. `Authorization: Bearer <token>` — CLI, mobile, server-to-server
921        //   2. `Cookie: <name>=<token>` — browsers (HttpOnly, XSS can't read)
922        // Bearer wins when both are present (explicit beats ambient).
923        let bearer_token: Option<String> = request
924            .headers()
925            .iter()
926            .find(|h| h.field.as_str() == "Authorization" || h.field.as_str() == "authorization")
927            .and_then(|h| {
928                let val = h.value.as_str();
929                val.strip_prefix("Bearer ").map(|t| t.to_string())
930            });
931        let cookie_token: Option<String> = if bearer_token.is_some() {
932            None
933        } else {
934            request
935                .headers()
936                .iter()
937                .find(|h| h.field.as_str() == "Cookie" || h.field.as_str() == "cookie")
938                .and_then(|h| {
939                    pylon_auth::extract_session_cookie(h.value.as_str(), &cookie_config.name)
940                })
941        };
942        let auth_token: Option<String> = bearer_token.or(cookie_token);
943        let auth_ctx = if admin_token.is_some()
944            && auth_token.is_some()
945            && pylon_auth::constant_time_eq(
946                auth_token.as_deref().unwrap_or("").as_bytes(),
947                admin_token.as_deref().unwrap_or("").as_bytes(),
948            ) {
949            pylon_auth::AuthContext::admin()
950        } else {
951            ss.resolve(auth_token.as_deref())
952        };
953
954        // --- Test-reset endpoint — in-memory + dev mode + localhost only ---
955        //
956        // `pylon test` sets PYLON_IN_MEMORY=1 + PYLON_DEV_MODE=true.
957        // The TS helper `resetDb()` posts here between `test(...)` blocks
958        // to isolate cases. Gates:
959        //   1. dev mode (production refuses outright)
960        //   2. in-memory DB (belt-and-braces against accidental file wipes)
961        //   3. peer IP is loopback (a dev laptop often has localhost:4321
962        //      reachable; without this, a browser visiting a malicious
963        //      site could cross-site-POST a reset via a bare form —
964        //      blind CSRF that doesn't care about the response)
965        //
966        // Positioned AFTER the rate limiter and CSRF check on purpose so
967        // those middlewares apply — the earlier placement skipped both.
968        if url == "/api/__test__/reset" && method == Method::Post {
969            let is_loopback = peer_ip == "127.0.0.1"
970                || peer_ip == "::1"
971                || peer_ip.starts_with("127.")
972                || peer_ip == "localhost";
973            if !is_dev || !rt.is_in_memory() || !is_loopback {
974                let body = json_error(
975                    "RESET_REFUSED",
976                    "reset endpoint is only available in dev mode + in-memory DB + from loopback",
977                );
978                let response = with_security_headers(
979                    Response::from_string(&body)
980                        .with_status_code(403u16)
981                        .with_header(
982                            Header::from_bytes("Content-Type", "application/json").unwrap(),
983                        )
984                        .with_header(
985                            Header::from_bytes(
986                                "Access-Control-Allow-Origin",
987                                cors_origin.as_bytes().to_vec(),
988                            )
989                            .unwrap(),
990                        ),
991                );
992                let _ = request.respond(response);
993                mt.record_request("POST", 403);
994                continue;
995            }
996            let (status, body) = match rt.reset_for_tests() {
997                Ok(()) => (200u16, "{\"reset\":true}".to_string()),
998                Err(e) => (500u16, json_error(&e.code, &e.message)),
999            };
1000            let response = with_security_headers(
1001                Response::from_string(&body)
1002                    .with_status_code(status)
1003                    .with_header(Header::from_bytes("Content-Type", "application/json").unwrap())
1004                    .with_header(
1005                        Header::from_bytes(
1006                            "Access-Control-Allow-Origin",
1007                            cors_origin.as_bytes().to_vec(),
1008                        )
1009                        .unwrap(),
1010                    ),
1011            );
1012            let _ = request.respond(response);
1013            mt.record_request("POST", status);
1014            continue;
1015        }
1016
1017        // --- File upload fast path: handle binary body before string conversion ---
1018        // Uploads come in two shapes:
1019        //   1. Direct binary body with X-Filename / Content-Type headers
1020        //   2. multipart/form-data with a file part
1021        //
1022        // Require an authenticated user. Uploads write to the files backend
1023        // (and into the plugin audit log for soft-delete etc.), so
1024        // unauthenticated callers cannot use this route.
1025        if url == "/api/files/upload" && method == Method::Post {
1026            const UPLOAD_MAX: usize = 10 * 1024 * 1024;
1027            // Enforce size BEFORE reading the body so a 10 GiB stream can't
1028            // buffer into memory. Content-Length pre-check, then bounded read.
1029            if let Some(declared) = request.body_length() {
1030                if declared > UPLOAD_MAX {
1031                    let err = json_error(
1032                        "PAYLOAD_TOO_LARGE",
1033                        &format!("Content-Length {declared} exceeds upload max of {UPLOAD_MAX}"),
1034                    );
1035                    let response = with_security_headers(
1036                        Response::from_string(&err)
1037                            .with_status_code(413u16)
1038                            .with_header(
1039                                Header::from_bytes("Content-Type", "application/json").unwrap(),
1040                            )
1041                            .with_header(
1042                                Header::from_bytes(
1043                                    "Access-Control-Allow-Origin",
1044                                    cors_origin.as_bytes().to_vec(),
1045                                )
1046                                .unwrap(),
1047                            ),
1048                    );
1049                    let _ = request.respond(response);
1050                    mt.record_request("POST", 413);
1051                    continue;
1052                }
1053            }
1054            if auth_ctx.user_id.is_none() {
1055                let err = json_error(
1056                    "AUTH_REQUIRED",
1057                    "/api/files/upload requires an authenticated session",
1058                );
1059                let response = with_security_headers(
1060                    Response::from_string(&err)
1061                        .with_status_code(401u16)
1062                        .with_header(
1063                            Header::from_bytes("Content-Type", "application/json").unwrap(),
1064                        )
1065                        .with_header(
1066                            Header::from_bytes(
1067                                "Access-Control-Allow-Origin",
1068                                cors_origin.as_bytes().to_vec(),
1069                            )
1070                            .unwrap(),
1071                        ),
1072                );
1073                let _ = request.respond(response);
1074                mt.record_request("POST", 401);
1075                continue;
1076            }
1077            // Read up to UPLOAD_MAX + 1 bytes. If we read the full +1 we know
1078            // the client lied about Content-Length (or used chunked encoding
1079            // and overran). Reject in that case instead of continuing with a
1080            // truncated file.
1081            use std::io::Read;
1082            let mut bytes: Vec<u8> = Vec::with_capacity(8192);
1083            let mut limited = request.as_reader().take((UPLOAD_MAX as u64) + 1);
1084            let _ = limited.read_to_end(&mut bytes);
1085
1086            const MAX: usize = UPLOAD_MAX;
1087            if bytes.len() > MAX {
1088                let err = json_error("PAYLOAD_TOO_LARGE", "File exceeds 10 MB limit");
1089                let response = with_security_headers(
1090                    Response::from_string(&err)
1091                        .with_status_code(413u16)
1092                        .with_header(
1093                            Header::from_bytes("Content-Type", "application/json").unwrap(),
1094                        )
1095                        .with_header(
1096                            Header::from_bytes(
1097                                "Access-Control-Allow-Origin",
1098                                cors_origin.as_bytes().to_vec(),
1099                            )
1100                            .unwrap(),
1101                        ),
1102                );
1103                let _ = request.respond(response);
1104                mt.record_request("POST", 413);
1105                continue;
1106            }
1107
1108            // Headers.
1109            let content_type = request
1110                .headers()
1111                .iter()
1112                .find(|h| h.field.as_str() == "Content-Type" || h.field.as_str() == "content-type")
1113                .map(|h| h.value.as_str().to_string())
1114                .unwrap_or_else(|| "application/octet-stream".into());
1115            let filename = request
1116                .headers()
1117                .iter()
1118                .find(|h| h.field.as_str() == "X-Filename" || h.field.as_str() == "x-filename")
1119                .map(|h| h.value.as_str().to_string())
1120                .unwrap_or_else(|| "upload".into());
1121
1122            // If multipart, extract the first file part. Otherwise use bytes directly.
1123            let (name, ct, payload) = if content_type.starts_with("multipart/form-data") {
1124                match parse_multipart_first_file(&bytes, &content_type) {
1125                    Some(p) => p,
1126                    None => {
1127                        let err = json_error("INVALID_MULTIPART", "Could not parse multipart body");
1128                        let response = with_security_headers(
1129                            Response::from_string(&err)
1130                                .with_status_code(400u16)
1131                                .with_header(
1132                                    Header::from_bytes("Content-Type", "application/json").unwrap(),
1133                                )
1134                                .with_header(
1135                                    Header::from_bytes(
1136                                        "Access-Control-Allow-Origin",
1137                                        cors_origin.as_bytes().to_vec(),
1138                                    )
1139                                    .unwrap(),
1140                                ),
1141                        );
1142                        let _ = request.respond(response);
1143                        mt.record_request("POST", 400);
1144                        continue;
1145                    }
1146                }
1147            } else {
1148                (filename, content_type, bytes)
1149            };
1150
1151            let storage = pylon_storage::files::LocalFileStorage::new(
1152                &std::env::var("PYLON_FILES_DIR").unwrap_or_else(|_| "uploads".into()),
1153                &std::env::var("PYLON_FILES_URL_PREFIX").unwrap_or_else(|_| "/api/files".into()),
1154            );
1155
1156            let (status, body) =
1157                match pylon_storage::files::FileStorage::store(&storage, &name, &payload, &ct) {
1158                    Ok(stored) => (
1159                        201u16,
1160                        serde_json::to_string(&stored).unwrap_or_else(|_| "{}".into()),
1161                    ),
1162                    Err(e) => (500u16, json_error(&e.code, &e.message)),
1163                };
1164
1165            let response = with_security_headers(
1166                Response::from_string(&body)
1167                    .with_status_code(status)
1168                    .with_header(Header::from_bytes("Content-Type", "application/json").unwrap())
1169                    .with_header(
1170                        Header::from_bytes(
1171                            "Access-Control-Allow-Origin",
1172                            cors_origin.as_bytes().to_vec(),
1173                        )
1174                        .unwrap(),
1175                    ),
1176            );
1177            let _ = request.respond(response);
1178            mt.record_request("POST", status);
1179            continue;
1180        }
1181
1182        // Read body before routing (request is consumed by respond).
1183        // Skip for methods that cannot have a body.
1184        //
1185        // Size enforcement runs in TWO layers so a malicious client can't
1186        // stream 10 GiB into memory before we reject it:
1187        //   1. Content-Length header is compared to MAX_BODY_SIZE up front.
1188        //   2. The actual read uses `.take(MAX_BODY_SIZE + 1)` so a lying
1189        //      or chunked stream is capped at MAX + 1 bytes; if we read that
1190        //      many, we reject.
1191        const MAX_BODY_SIZE: usize = 10 * 1024 * 1024;
1192
1193        if let Some(declared) = request.body_length() {
1194            if declared > MAX_BODY_SIZE {
1195                let err_body = json_error(
1196                    "PAYLOAD_TOO_LARGE",
1197                    &format!("Content-Length {declared} exceeds max of {MAX_BODY_SIZE}"),
1198                );
1199                let response = with_security_headers(
1200                    Response::from_string(&err_body)
1201                        .with_status_code(413u16)
1202                        .with_header(
1203                            Header::from_bytes(
1204                                "Access-Control-Allow-Origin",
1205                                cors_origin.as_bytes().to_vec(),
1206                            )
1207                            .unwrap(),
1208                        ),
1209                );
1210                let _ = request.respond(response);
1211                mt.record_request(method.as_str(), 413);
1212                continue;
1213            }
1214        }
1215
1216        let mut body = String::new();
1217        if !matches!(
1218            method,
1219            Method::Get | Method::Head | Method::Options | Method::Delete
1220        ) {
1221            use std::io::Read;
1222            let mut limited = request.as_reader().take((MAX_BODY_SIZE as u64) + 1);
1223            let _ = limited.read_to_string(&mut body);
1224        }
1225
1226        if body.len() > MAX_BODY_SIZE {
1227            let err_body = json_error(
1228                "PAYLOAD_TOO_LARGE",
1229                &format!(
1230                    "Request body exceeds maximum size of {} bytes",
1231                    MAX_BODY_SIZE,
1232                ),
1233            );
1234            let response = with_security_headers(
1235                Response::from_string(&err_body)
1236                    .with_status_code(413u16)
1237                    .with_header(Header::from_bytes("Content-Type", "application/json").unwrap())
1238                    .with_header(
1239                        Header::from_bytes(
1240                            "Access-Control-Allow-Origin",
1241                            cors_origin.as_bytes().to_vec(),
1242                        )
1243                        .unwrap(),
1244                    ),
1245            );
1246            let _ = request.respond(response);
1247            mt.record_request(method.as_str(), 413);
1248            continue;
1249        }
1250
1251        // (auth_token + auth_ctx were resolved above, before the fast paths.)
1252
1253        // --- GET /api/shards/:id/connect — SSE snapshot stream ---
1254        if method == Method::Get {
1255            if let Some(rest) = url.strip_prefix("/api/shards/") {
1256                let rest = rest.split('?').next().unwrap_or(rest);
1257                if let Some(shard_id) = rest.strip_suffix("/connect") {
1258                    // Require an authenticated user. Shard SSE streams state
1259                    // snapshots tick-by-tick; an anonymous subscriber can
1260                    // both read that state AND influence via push_input (see
1261                    // the WS handler). Gate at the transport layer.
1262                    if auth_ctx.user_id.is_none() {
1263                        let err = json_error(
1264                            "AUTH_REQUIRED",
1265                            "Shard connect requires an authenticated session",
1266                        );
1267                        let response = with_security_headers(
1268                            Response::from_string(&err)
1269                                .with_status_code(401u16)
1270                                .with_header(
1271                                    Header::from_bytes("Content-Type", "application/json").unwrap(),
1272                                )
1273                                .with_header(
1274                                    Header::from_bytes(
1275                                        "Access-Control-Allow-Origin",
1276                                        cors_origin.as_bytes().to_vec(),
1277                                    )
1278                                    .unwrap(),
1279                                ),
1280                        );
1281                        let _ = request.respond(response);
1282                        mt.record_request("GET", 401);
1283                        continue;
1284                    }
1285                    let shards = match &shards_ref {
1286                        Some(s) => Arc::clone(s),
1287                        None => {
1288                            let err = json_error(
1289                                "SHARDS_NOT_AVAILABLE",
1290                                "Shard system is not configured",
1291                            );
1292                            let response = with_security_headers(
1293                                Response::from_string(&err)
1294                                    .with_status_code(503u16)
1295                                    .with_header(
1296                                        Header::from_bytes("Content-Type", "application/json")
1297                                            .unwrap(),
1298                                    )
1299                                    .with_header(
1300                                        Header::from_bytes(
1301                                            "Access-Control-Allow-Origin",
1302                                            cors_origin.as_bytes().to_vec(),
1303                                        )
1304                                        .unwrap(),
1305                                    ),
1306                            );
1307                            let _ = request.respond(response);
1308                            mt.record_request("GET", 503);
1309                            continue;
1310                        }
1311                    };
1312                    let shard = match shards.get(shard_id) {
1313                        Some(s) => s,
1314                        None => {
1315                            let err = json_error(
1316                                "SHARD_NOT_FOUND",
1317                                &format!("Shard \"{shard_id}\" not found"),
1318                            );
1319                            let response = with_security_headers(
1320                                Response::from_string(&err)
1321                                    .with_status_code(404u16)
1322                                    .with_header(
1323                                        Header::from_bytes("Content-Type", "application/json")
1324                                            .unwrap(),
1325                                    )
1326                                    .with_header(
1327                                        Header::from_bytes(
1328                                            "Access-Control-Allow-Origin",
1329                                            cors_origin.as_bytes().to_vec(),
1330                                        )
1331                                        .unwrap(),
1332                                    ),
1333                            );
1334                            let _ = request.respond(response);
1335                            mt.record_request("GET", 404);
1336                            continue;
1337                        }
1338                    };
1339
1340                    // Subscriber ID from ?sid= query param, else the authed user,
1341                    // else a generated anonymous ID.
1342                    let sub_id = url
1343                        .split("sid=")
1344                        .nth(1)
1345                        .and_then(|s| s.split('&').next())
1346                        .map(|s| s.to_string())
1347                        .or_else(|| auth_ctx.user_id.clone())
1348                        .unwrap_or_else(|| {
1349                            format!(
1350                                "anon_{}",
1351                                std::time::SystemTime::now()
1352                                    .duration_since(std::time::UNIX_EPOCH)
1353                                    .unwrap_or_default()
1354                                    .as_nanos()
1355                            )
1356                        });
1357                    let subscriber_id = pylon_realtime::SubscriberId::new(sub_id);
1358
1359                    let (tx, rx) = std::sync::mpsc::channel::<Vec<u8>>();
1360                    let streaming_body = StreamingBody::new(rx);
1361
1362                    let tx_clone = tx.clone();
1363                    let sink: pylon_realtime::SnapshotSink =
1364                        Box::new(move |tick: u64, bytes: &[u8]| {
1365                            // Format as SSE with an id: line carrying the tick
1366                            // number so clients can resume with Last-Event-ID.
1367                            let mut frame = format!("id: {tick}\ndata: ").into_bytes();
1368                            frame.extend_from_slice(bytes);
1369                            frame.extend_from_slice(b"\n\n");
1370                            let _ = tx_clone.send(frame);
1371                        });
1372
1373                    let shard_auth = pylon_realtime::ShardAuth {
1374                        user_id: auth_ctx.user_id.clone(),
1375                        is_admin: auth_ctx.is_admin,
1376                    };
1377                    if let Err(e) = shard.add_subscriber(subscriber_id.clone(), sink, &shard_auth) {
1378                        let (status, code) = match &e {
1379                            pylon_realtime::ShardError::Unauthorized(_) => (403u16, "UNAUTHORIZED"),
1380                            _ => (429u16, "SUBSCRIBE_FAILED"),
1381                        };
1382                        let err = json_error(code, &e.to_string());
1383                        let response = with_security_headers(
1384                            Response::from_string(&err)
1385                                .with_status_code(status)
1386                                .with_header(
1387                                    Header::from_bytes("Content-Type", "application/json").unwrap(),
1388                                )
1389                                .with_header(
1390                                    Header::from_bytes(
1391                                        "Access-Control-Allow-Origin",
1392                                        cors_origin.as_bytes().to_vec(),
1393                                    )
1394                                    .unwrap(),
1395                                ),
1396                        );
1397                        let _ = request.respond(response);
1398                        mt.record_request("GET", status);
1399                        continue;
1400                    }
1401
1402                    // Auto-unsubscribe when the client disconnects: we watch
1403                    // for mpsc channel disconnection in a sentinel thread.
1404                    {
1405                        let shard_cleanup = Arc::clone(&shard);
1406                        let sub_id_cleanup = subscriber_id.clone();
1407                        let tx_liveness = tx.clone();
1408                        std::thread::spawn(move || {
1409                            // Send a heartbeat every 30s; if send fails, the
1410                            // channel is closed (client disconnected).
1411                            loop {
1412                                std::thread::sleep(std::time::Duration::from_secs(30));
1413                                if tx_liveness.send(b": heartbeat\n\n".to_vec()).is_err() {
1414                                    shard_cleanup.remove_subscriber(&sub_id_cleanup);
1415                                    return;
1416                                }
1417                                if !shard_cleanup.is_running() {
1418                                    return;
1419                                }
1420                            }
1421                        });
1422                    }
1423
1424                    let response = with_security_headers(Response::new(
1425                        tiny_http::StatusCode(200),
1426                        vec![
1427                            Header::from_bytes("Content-Type", "text/event-stream").unwrap(),
1428                            Header::from_bytes("Cache-Control", "no-cache").unwrap(),
1429                            Header::from_bytes("Connection", "keep-alive").unwrap(),
1430                            Header::from_bytes(
1431                                "Access-Control-Allow-Origin",
1432                                cors_origin.as_bytes().to_vec(),
1433                            )
1434                            .unwrap(),
1435                        ],
1436                        streaming_body,
1437                        None,
1438                        None,
1439                    ));
1440                    let _ = request.respond(response);
1441                    mt.record_request("GET", 200);
1442                    continue;
1443                }
1444            }
1445        }
1446
1447        // --- POST /api/fn/:name with Accept: text/event-stream — streaming functions ---
1448        if method == Method::Post
1449            && url.starts_with("/api/fn/")
1450            && url != "/api/fn/traces"
1451            && request.headers().iter().any(|h| {
1452                (h.field.as_str() == "Accept" || h.field.as_str() == "accept")
1453                    && h.value.as_str().contains("text/event-stream")
1454            })
1455        {
1456            let fn_name = url
1457                .strip_prefix("/api/fn/")
1458                .unwrap_or("")
1459                .split('?')
1460                .next()
1461                .unwrap_or("")
1462                .to_string();
1463
1464            if let Some(fn_ops) = &fn_ops_maybe {
1465                // Mirror the router's gates so the streaming fast path doesn't
1466                // become a way to bypass function auth / rate limits.
1467                // 1. Function must exist (otherwise 404, not a hung SSE).
1468                if pylon_router::FnOps::get_fn(fn_ops.as_ref(), &fn_name).is_none() {
1469                    let err = json_error(
1470                        "FN_NOT_FOUND",
1471                        &format!("Function \"{fn_name}\" is not registered"),
1472                    );
1473                    let response = with_security_headers(
1474                        Response::from_string(&err)
1475                            .with_status_code(404u16)
1476                            .with_header(
1477                                Header::from_bytes("Content-Type", "application/json").unwrap(),
1478                            )
1479                            .with_header(
1480                                Header::from_bytes(
1481                                    "Access-Control-Allow-Origin",
1482                                    cors_origin.as_bytes().to_vec(),
1483                                )
1484                                .unwrap(),
1485                            ),
1486                    );
1487                    let _ = request.respond(response);
1488                    mt.record_request("POST", 404);
1489                    continue;
1490                }
1491                // 2. Per-function rate limit (identity = user_id or "anon").
1492                let identity = auth_ctx.user_id.as_deref().unwrap_or("anon");
1493                if let Err(retry_after) =
1494                    pylon_router::FnOps::check_rate_limit(fn_ops.as_ref(), &fn_name, identity)
1495                {
1496                    let body = format!(
1497                        r#"{{"error":{{"code":"RATE_LIMITED","message":"Function \"{fn_name}\" rate limit exceeded","retry_after_secs":{retry_after}}}}}"#
1498                    );
1499                    let response = with_security_headers(
1500                        Response::from_string(&body)
1501                            .with_status_code(429u16)
1502                            .with_header(
1503                                Header::from_bytes("Content-Type", "application/json").unwrap(),
1504                            )
1505                            .with_header(
1506                                Header::from_bytes(
1507                                    "Access-Control-Allow-Origin",
1508                                    cors_origin.as_bytes().to_vec(),
1509                                )
1510                                .unwrap(),
1511                            ),
1512                    );
1513                    let _ = request.respond(response);
1514                    mt.record_request("POST", 429);
1515                    continue;
1516                }
1517
1518                let args: serde_json::Value =
1519                    serde_json::from_str(&body).unwrap_or(serde_json::json!({}));
1520
1521                let auth = pylon_functions::protocol::AuthInfo {
1522                    user_id: auth_ctx.user_id.clone(),
1523                    is_admin: auth_ctx.is_admin,
1524                    tenant_id: auth_ctx.tenant_id.clone(),
1525                };
1526
1527                let (tx, rx) = std::sync::mpsc::channel::<Vec<u8>>();
1528                let streaming_body = StreamingBody::new(rx);
1529
1530                let fn_ops_cl = Arc::clone(fn_ops);
1531                let tx_stream = tx.clone();
1532                std::thread::spawn(move || {
1533                    let tx_cb = tx_stream.clone();
1534                    let on_stream: Box<dyn FnMut(&str) + Send> = Box::new(move |chunk: &str| {
1535                        let sse = format!("data: {}\n\n", chunk);
1536                        let _ = tx_cb.send(sse.into_bytes());
1537                    });
1538
1539                    let result = pylon_router::FnOps::call(
1540                        fn_ops_cl.as_ref(),
1541                        &fn_name,
1542                        args,
1543                        auth,
1544                        Some(on_stream),
1545                        None, // streaming /api/fn/:name never carries HTTP request metadata
1546                    );
1547                    match result {
1548                        Ok((value, _trace)) => {
1549                            let done = format!(
1550                                "event: result\ndata: {}\n\n",
1551                                serde_json::to_string(&value).unwrap_or_else(|_| "null".into())
1552                            );
1553                            let _ = tx_stream.send(done.into_bytes());
1554                        }
1555                        Err(e) => {
1556                            let err = format!(
1557                                "event: error\ndata: {}\n\n",
1558                                serde_json::json!({"code": e.code, "message": e.message})
1559                            );
1560                            let _ = tx_stream.send(err.into_bytes());
1561                        }
1562                    }
1563                });
1564
1565                let response = with_security_headers(Response::new(
1566                    tiny_http::StatusCode(200),
1567                    vec![
1568                        Header::from_bytes("Content-Type", "text/event-stream").unwrap(),
1569                        Header::from_bytes("Cache-Control", "no-cache").unwrap(),
1570                        Header::from_bytes("Connection", "keep-alive").unwrap(),
1571                        Header::from_bytes(
1572                            "Access-Control-Allow-Origin",
1573                            cors_origin.as_bytes().to_vec(),
1574                        )
1575                        .unwrap(),
1576                    ],
1577                    streaming_body,
1578                    None,
1579                    None,
1580                ));
1581                let _ = request.respond(response);
1582                mt.record_request("POST", 200);
1583                continue;
1584            }
1585        }
1586
1587        // --- POST /api/ai/stream — SSE streaming AI completion ---
1588        if url == "/api/ai/stream" && method == Method::Post {
1589            // AI endpoints spend real money per call. Require auth so a
1590            // drive-by caller can't burn through the provider budget.
1591            if auth_ctx.user_id.is_none() {
1592                let err = json_error(
1593                    "AUTH_REQUIRED",
1594                    "/api/ai/stream requires an authenticated session",
1595                );
1596                let response = with_security_headers(
1597                    Response::from_string(&err)
1598                        .with_status_code(401u16)
1599                        .with_header(
1600                            Header::from_bytes("Content-Type", "application/json").unwrap(),
1601                        )
1602                        .with_header(
1603                            Header::from_bytes(
1604                                "Access-Control-Allow-Origin",
1605                                cors_origin.as_bytes().to_vec(),
1606                            )
1607                            .unwrap(),
1608                        ),
1609                );
1610                let _ = request.respond(response);
1611                mt.record_request("POST", 401);
1612                continue;
1613            }
1614            let ai_provider = std::env::var("PYLON_AI_PROVIDER").unwrap_or_default();
1615            let ai_key = std::env::var("PYLON_AI_API_KEY").unwrap_or_default();
1616            let ai_model = std::env::var("PYLON_AI_MODEL").unwrap_or_default();
1617            let ai_base = std::env::var("PYLON_AI_BASE_URL").unwrap_or_default();
1618
1619            if ai_key.is_empty() && ai_provider != "custom" {
1620                let err = json_error(
1621                    "AI_NOT_CONFIGURED",
1622                    "Set PYLON_AI_PROVIDER and PYLON_AI_API_KEY",
1623                );
1624                let response = with_security_headers(
1625                    Response::from_string(&err)
1626                        .with_status_code(503u16)
1627                        .with_header(
1628                            Header::from_bytes("Content-Type", "application/json").unwrap(),
1629                        )
1630                        .with_header(
1631                            Header::from_bytes(
1632                                "Access-Control-Allow-Origin",
1633                                cors_origin.as_bytes().to_vec(),
1634                            )
1635                            .unwrap(),
1636                        ),
1637                );
1638                let _ = request.respond(response);
1639                mt.record_request("POST", 503);
1640                continue;
1641            }
1642
1643            let parsed: serde_json::Value = match serde_json::from_str(&body) {
1644                Ok(v) => v,
1645                Err(_) => {
1646                    let err = json_error("INVALID_JSON", "Invalid request body");
1647                    let response = with_security_headers(
1648                        Response::from_string(&err)
1649                            .with_status_code(400u16)
1650                            .with_header(
1651                                Header::from_bytes("Content-Type", "application/json").unwrap(),
1652                            )
1653                            .with_header(
1654                                Header::from_bytes(
1655                                    "Access-Control-Allow-Origin",
1656                                    cors_origin.as_bytes().to_vec(),
1657                                )
1658                                .unwrap(),
1659                            ),
1660                    );
1661                    let _ = request.respond(response);
1662                    mt.record_request("POST", 400);
1663                    continue;
1664                }
1665            };
1666
1667            let messages: Vec<AiMessage> = match parsed.get("messages").and_then(|m| m.as_array()) {
1668                Some(arr) => arr
1669                    .iter()
1670                    .filter_map(|m| {
1671                        let role = m.get("role")?.as_str()?.to_string();
1672                        let content = m.get("content")?.as_str()?.to_string();
1673                        Some(AiMessage { role, content })
1674                    })
1675                    .collect(),
1676                None => {
1677                    let err = json_error("MISSING_FIELD", "\"messages\" array is required");
1678                    let response = with_security_headers(
1679                        Response::from_string(&err)
1680                            .with_status_code(400u16)
1681                            .with_header(
1682                                Header::from_bytes("Content-Type", "application/json").unwrap(),
1683                            )
1684                            .with_header(
1685                                Header::from_bytes(
1686                                    "Access-Control-Allow-Origin",
1687                                    cors_origin.as_bytes().to_vec(),
1688                                )
1689                                .unwrap(),
1690                            ),
1691                    );
1692                    let _ = request.respond(response);
1693                    mt.record_request("POST", 400);
1694                    continue;
1695                }
1696            };
1697
1698            // Override model from request body if provided.
1699            let model = parsed
1700                .get("model")
1701                .and_then(|m| m.as_str())
1702                .map(|s| s.to_string())
1703                .unwrap_or(ai_model);
1704
1705            let proxy = match ai_provider.as_str() {
1706                "anthropic" => AiProxyPlugin::anthropic(&ai_key, &model),
1707                "openai" => AiProxyPlugin::openai(&ai_key, &model),
1708                "custom" => AiProxyPlugin::custom_with_model(&ai_base, &ai_key, &model),
1709                _ => AiProxyPlugin::openai(&ai_key, &model),
1710            };
1711
1712            // Set up a channel-based streaming body so tiny_http streams
1713            // data to the client as chunks arrive from the AI provider.
1714            let (tx, rx) = std::sync::mpsc::channel::<Vec<u8>>();
1715            let streaming_body = StreamingBody::new(rx);
1716
1717            // Spawn the provider request on a background thread. Each chunk
1718            // is formatted as an SSE event and pushed through the channel.
1719            std::thread::spawn(move || {
1720                let result = proxy.stream_completion(&messages, &mut |chunk| {
1721                    let sse = format!(
1722                        "data: {}
1723
1724",
1725                        serde_json::json!({
1726                            "choices": [{"index": 0, "delta": {"content": chunk}}]
1727                        })
1728                    );
1729                    let _ = tx.send(sse.into_bytes());
1730                });
1731
1732                // Send a final event indicating completion or error.
1733                match result {
1734                    Ok(_) => {
1735                        let _ = tx.send(
1736                            b"data: [DONE]
1737
1738"
1739                            .to_vec(),
1740                        );
1741                    }
1742                    Err(e) => {
1743                        let err_event = format!(
1744                            "data: {}
1745
1746",
1747                            serde_json::json!({"error": {"message": e, "type": "stream_error"}})
1748                        );
1749                        let _ = tx.send(err_event.into_bytes());
1750                    }
1751                }
1752                // tx is dropped here, which causes StreamingBody::read to return 0 (EOF).
1753            });
1754
1755            let response = with_security_headers(Response::new(
1756                tiny_http::StatusCode(200),
1757                vec![
1758                    Header::from_bytes("Content-Type", "text/event-stream").unwrap(),
1759                    Header::from_bytes("Cache-Control", "no-cache").unwrap(),
1760                    Header::from_bytes("Connection", "keep-alive").unwrap(),
1761                    Header::from_bytes(
1762                        "Access-Control-Allow-Origin",
1763                        cors_origin.as_bytes().to_vec(),
1764                    )
1765                    .unwrap(),
1766                ],
1767                streaming_body,
1768                None, // unknown content length = chunked transfer
1769                None,
1770            ));
1771            let _ = request.respond(response);
1772            mt.record_request("POST", 200);
1773            continue;
1774        }
1775
1776        // Studio route (returns HTML, not JSON).
1777        //
1778        // Privileged admin UI. It renders the full schema and lets the
1779        // operator run mutations against the data browser. In production we
1780        // require an admin token; in dev mode we leave it open so
1781        // `pylon dev` remains friction-free for the single-user case.
1782        //
1783        // Serving a WWW-Authenticate Basic realm isn't useful here because
1784        // admin auth is bearer-token based. Callers get a 401 and should
1785        // retry with `Authorization: Bearer <PYLON_ADMIN_TOKEN>`.
1786        let (status, response_body, content_type, is_studio, extra_headers) = if (url == "/studio"
1787            || url == "/studio/")
1788            && method == Method::Get
1789        {
1790            if !is_dev && !auth_ctx.is_admin {
1791                let body = json_error(
1792                    "AUTH_REQUIRED",
1793                    "/studio requires admin auth in production (set PYLON_ADMIN_TOKEN and pass it as Bearer)",
1794                );
1795                let response = with_security_headers(
1796                    Response::from_string(&body)
1797                        .with_status_code(401u16)
1798                        .with_header(
1799                            Header::from_bytes("Content-Type", "application/json").unwrap(),
1800                        )
1801                        .with_header(
1802                            Header::from_bytes(
1803                                "Access-Control-Allow-Origin",
1804                                cors_origin.as_bytes().to_vec(),
1805                            )
1806                            .unwrap(),
1807                        ),
1808                );
1809                let _ = request.respond(response);
1810                mt.record_request("GET", 401);
1811                continue;
1812            }
1813            // Derive the public base URL from the request's Host header +
1814            // X-Forwarded-Proto (Fly / any HTTPS terminator sets this).
1815            // Hardcoding `http://localhost:{port}` here meant the studio
1816            // HTML served from pylon-crm.fly.dev tried to fetch
1817            // http://localhost:4321/api/* from the browser, which CSP
1818            // rightly blocks.
1819            let host = request
1820                .headers()
1821                .iter()
1822                .find(|h| h.field.equiv("Host"))
1823                .map(|h| h.value.as_str().to_string())
1824                .unwrap_or_else(|| format!("localhost:{port}"));
1825            let scheme = request
1826                .headers()
1827                .iter()
1828                .find(|h| h.field.equiv("X-Forwarded-Proto"))
1829                .map(|h| h.value.as_str().to_string())
1830                .unwrap_or_else(|| "http".to_string());
1831            let base = format!("{scheme}://{host}");
1832            let html = pylon_studio_api::generate_studio_html(rt.manifest(), &base);
1833            (
1834                200u16,
1835                html,
1836                "text/html",
1837                true,
1838                Vec::<(String, String)>::new(),
1839            )
1840        } else {
1841            // Run plugin middleware with per-request metadata so rate-limit
1842            // plugins can bucket by peer IP (not just user id) when the
1843            // caller is anonymous.
1844            let meta = pylon_plugin::RequestMeta {
1845                peer_ip: peer_ip.as_str(),
1846            };
1847            if let Err(e) = pr.run_on_request_with_meta(method.as_str(), &url, &auth_ctx, &meta) {
1848                (
1849                    e.status,
1850                    json_error(&e.code, &e.message),
1851                    "application/json",
1852                    false,
1853                    Vec::new(),
1854                )
1855            } else if let Some((s, b)) =
1856                pr.try_handle_route(method.as_str(), &url, &body, &auth_ctx)
1857            {
1858                // Plugin handled the route.
1859                (s, b, "application/json", false, Vec::new())
1860            } else {
1861                let notifier = WsSseNotifier {
1862                    ws: Arc::clone(&wh),
1863                    sse: Arc::clone(&sh),
1864                };
1865                let openapi_gen = RuntimeOpenApiGenerator {
1866                    manifest: rt.manifest(),
1867                };
1868                let file_ops = LocalFileOps::new_default();
1869                let cache_adapter = CacheAdapter(Arc::clone(&ca));
1870                let pubsub_adapter = PubSubAdapter(Arc::clone(&ps));
1871                let email_adapter = EmailAdapter::from_env();
1872                let fn_ops: Option<&dyn pylon_router::FnOps> =
1873                    fn_ops_ref.as_deref().map(|f| f as &dyn pylon_router::FnOps);
1874                let shard_adapter = shards_ref.as_ref().map(|reg| ShardOpsAdapter {
1875                    registry: Arc::clone(reg),
1876                });
1877                let shard_ops: Option<&dyn pylon_router::ShardOps> = shard_adapter
1878                    .as_ref()
1879                    .map(|a| a as &dyn pylon_router::ShardOps);
1880                let plugin_hooks = PluginHooksAdapter(Arc::clone(&pr));
1881                // Snapshot request headers as (name, value) pairs for the
1882                // router to forward into webhook-invoked actions. Header
1883                // names are left as-sent; the router lowercases + merges
1884                // duplicates per RFC 7230 when constructing RequestInfo.
1885                let request_headers: Vec<(String, String)> = request
1886                    .headers()
1887                    .iter()
1888                    .map(|h| (h.field.as_str().to_string(), h.value.as_str().to_string()))
1889                    .collect();
1890                let router_ctx = pylon_router::RouterContext {
1891                    store: rt.as_ref(),
1892                    session_store: &ss,
1893                    magic_codes: &mc,
1894                    oauth_state: &os,
1895                    policy_engine: &pe,
1896                    change_log: &cl,
1897                    notifier: &notifier,
1898                    rooms: rm.as_ref(),
1899                    cache: &cache_adapter,
1900                    pubsub: &pubsub_adapter,
1901                    jobs: jq.as_ref(),
1902                    scheduler: sc.as_ref(),
1903                    workflows: we.as_ref(),
1904                    files: &file_ops,
1905                    openapi: &openapi_gen,
1906                    functions: fn_ops,
1907                    email: &email_adapter,
1908                    shards: shard_ops,
1909                    plugin_hooks: &plugin_hooks,
1910                    auth_ctx: &auth_ctx,
1911                    is_dev,
1912                    request_headers: &request_headers,
1913                    peer_ip: peer_ip.as_str(),
1914                    cookie_config: cookie_config.as_ref(),
1915                    response_headers: std::cell::RefCell::new(Vec::new()),
1916                };
1917                let http_method = HttpMethod::from_str(method.as_str());
1918                let (s, b, _ct) = pylon_router::route(
1919                    &router_ctx,
1920                    http_method,
1921                    &url,
1922                    &body,
1923                    auth_token.as_deref(),
1924                );
1925                let extra_headers = router_ctx.take_response_headers();
1926                (s, b, "application/json", false, extra_headers)
1927            }
1928        };
1929
1930        let mut response = Response::from_string(&response_body)
1931            .with_status_code(status)
1932            .with_header(Header::from_bytes("Content-Type", content_type).unwrap())
1933            .with_header(
1934                Header::from_bytes(
1935                    "Access-Control-Allow-Origin",
1936                    cors_origin.as_bytes().to_vec(),
1937                )
1938                .unwrap(),
1939            )
1940            .with_header(
1941                Header::from_bytes(
1942                    "Access-Control-Allow-Methods",
1943                    "GET, POST, PATCH, DELETE, OPTIONS",
1944                )
1945                .unwrap(),
1946            )
1947            .with_header(
1948                Header::from_bytes(
1949                    "Access-Control-Allow-Headers",
1950                    "Content-Type, Authorization",
1951                )
1952                .unwrap(),
1953            );
1954        // Cookie-based auth requires `Access-Control-Allow-Credentials:
1955        // true` on the response, paired with a specific origin. Vary
1956        // ensures intermediaries don't cache one origin's response and
1957        // serve it back to a different origin's browser.
1958        if allow_credentials {
1959            response = response
1960                .with_header(
1961                    Header::from_bytes("Access-Control-Allow-Credentials", "true").unwrap(),
1962                )
1963                .with_header(Header::from_bytes("Vary", "Origin").unwrap());
1964        }
1965
1966        // Apply any extra headers handlers attached via the router context
1967        // (Set-Cookie on login/logout, Location on OAuth GET callback).
1968        // Bytes from these headers come from server-built strings — bad
1969        // bytes here would be a programming bug, not request-driven, so a
1970        // failed Header::from_bytes is silently dropped rather than
1971        // poisoning the response.
1972        for (name, value) in extra_headers {
1973            if let Ok(h) = Header::from_bytes(name.as_bytes(), value.as_bytes().to_vec()) {
1974                response = response.with_header(h);
1975            }
1976        }
1977
1978        // Add Content-Security-Policy for Studio HTML responses.
1979        //
1980        // Studio talks to the same Rust process over HTTP (same origin)
1981        // AND a sibling WebSocket port (port+1, scheme ws:). CSP's
1982        // `default-src` covers `connect-src` by fallback, so any
1983        // directive we set there must include the WS scheme or the
1984        // browser silently blocks the live-sync connection.
1985        //
1986        // `ws:` + `wss:` cover localhost dev + TLS deploys without
1987        // hard-coding ports. Same-origin `'self'` keeps HTTP fetches
1988        // allowed. Inline + eval stay for the Tailwind/Babel CDN scripts
1989        // the current Studio HTML includes.
1990        if is_studio {
1991            response = response.with_header(
1992                Header::from_bytes(
1993                    "Content-Security-Policy",
1994                    "default-src 'self' 'unsafe-inline' 'unsafe-eval' https://cdn.tailwindcss.com https://unpkg.com ws: wss:",
1995                ).unwrap(),
1996            );
1997        }
1998
1999        let response = with_security_headers(response);
2000
2001        let _ = request.respond(response);
2002        mt.record_request(method.as_str(), status);
2003    }
2004
2005    tracing::warn!("Shutting down gracefully...");
2006
2007    // --- Drain phase ---
2008    // Stop accepting new work, let in-flight finish, close subsystems cleanly.
2009    let drain_timeout = std::time::Duration::from_secs(
2010        std::env::var("PYLON_DRAIN_SECS")
2011            .ok()
2012            .and_then(|s| s.parse().ok())
2013            .unwrap_or(10),
2014    );
2015    let start = Instant::now();
2016
2017    // Stop any running shards so their tick loops exit.
2018    if let Some(reg) = &shard_registry {
2019        for id in reg.ids() {
2020            if let Some(shard) = reg.get(&id) {
2021                shard.stop();
2022            }
2023        }
2024    }
2025
2026    // Let the scheduler finish its current cycle.
2027    let _ = &scheduler; // drop Arc at end of scope
2028
2029    // Wait for outstanding workers to idle, up to drain_timeout.
2030    while start.elapsed() < drain_timeout {
2031        let pending_jobs = job_queue.stats().pending;
2032        if pending_jobs == 0 {
2033            break;
2034        }
2035        std::thread::sleep(std::time::Duration::from_millis(100));
2036    }
2037
2038    let elapsed = start.elapsed();
2039    tracing::warn!(
2040        "Drain complete in {:.1}s (timeout {}s)",
2041        elapsed.as_secs_f32(),
2042        drain_timeout.as_secs()
2043    );
2044    Ok(())
2045}
2046
2047// The route() function has been extracted to the `pylon-router` crate.
2048// See `pylon_router::route()` for the platform-agnostic routing logic.
2049// The server now delegates to it via a `RouterContext`.
2050
2051fn json_error(code: &str, message: &str) -> String {
2052    pylon_router::json_error(code, message)
2053}
2054
2055/// Build the session store. Persists by default for file-backed runtimes —
2056/// sessions live in a sibling `<db>.sessions.db` file next to the app DB
2057/// unless `PYLON_SESSION_DB` overrides the path or
2058/// `PYLON_SESSION_IN_MEMORY=1` opts out. In-memory runtimes (tests)
2059/// get an in-memory session store.
2060///
2061/// This used to be opt-in, which silently broke every app after a server
2062/// restart: tokens in browser localStorage resolved to anonymous, pulls
2063/// came back empty under policy, and mutations 400'd with UNAUTHENTICATED.
2064fn build_session_store(app_db_path: Option<&str>) -> SessionStore {
2065    if std::env::var("PYLON_SESSION_IN_MEMORY")
2066        .map(|v| v == "1" || v == "true")
2067        .unwrap_or(false)
2068    {
2069        return SessionStore::new();
2070    }
2071    let explicit = std::env::var("PYLON_SESSION_DB").ok();
2072    let default_path = app_db_path.map(|p| format!("{p}.sessions.db"));
2073    let path = match explicit.or(default_path) {
2074        Some(p) => p,
2075        None => return SessionStore::new(),
2076    };
2077    match crate::session_backend::SqliteSessionBackend::open(&path) {
2078        Ok(backend) => {
2079            tracing::info!("[pylon] Session persistence enabled: {path}");
2080            SessionStore::with_backend(Box::new(backend))
2081        }
2082        Err(e) => {
2083            tracing::warn!(
2084                "[pylon] could not open session DB {path}: {e}. Falling back to in-memory sessions."
2085            );
2086            SessionStore::new()
2087        }
2088    }
2089}
2090
2091/// Parse a `multipart/form-data` body and return the first file part found.
2092///
2093/// Returns `(filename, content_type, bytes)` on success, `None` if the body
2094/// can't be parsed or no file part exists.
2095///
2096/// Handles the common RFC 7578 subset used by browsers and curl:
2097/// - `--boundary` part separators
2098/// - `Content-Disposition: form-data; name=...; filename=...`
2099/// - `Content-Type: ...`
2100/// - blank line, then raw bytes, then `\r\n--boundary` terminator
2101fn parse_multipart_first_file(
2102    body: &[u8],
2103    content_type_header: &str,
2104) -> Option<(String, String, Vec<u8>)> {
2105    // Extract the boundary parameter.
2106    let boundary_param = content_type_header
2107        .split(';')
2108        .find_map(|p| p.trim().strip_prefix("boundary="))?;
2109    let boundary = boundary_param.trim_matches('"');
2110    let delimiter = format!("--{boundary}");
2111    let delimiter_bytes = delimiter.as_bytes();
2112
2113    // Find each part between delimiters.
2114    let mut pos = 0usize;
2115    while pos < body.len() {
2116        // Find the next delimiter.
2117        let next = find_subslice(&body[pos..], delimiter_bytes)?;
2118        let part_start = pos + next + delimiter_bytes.len();
2119        // Skip CRLF or -- (terminator) after the delimiter.
2120        if part_start + 2 > body.len() {
2121            return None;
2122        }
2123        if &body[part_start..part_start + 2] == b"--" {
2124            return None; // end of parts, no file found
2125        }
2126        let header_start = part_start + skip_crlf(&body[part_start..]);
2127
2128        // Find end-of-headers (blank line).
2129        let header_end_offset = find_subslice(&body[header_start..], b"\r\n\r\n")?;
2130        let headers = &body[header_start..header_start + header_end_offset];
2131        let data_start = header_start + header_end_offset + 4;
2132
2133        // Find the next delimiter — that's where this part's data ends.
2134        let next_delim_offset = find_subslice(&body[data_start..], delimiter_bytes)?;
2135        // Strip the trailing CRLF before the delimiter.
2136        let mut data_end = data_start + next_delim_offset;
2137        if data_end >= 2 && &body[data_end - 2..data_end] == b"\r\n" {
2138            data_end -= 2;
2139        }
2140
2141        // Parse headers we care about.
2142        let headers_str = std::str::from_utf8(headers).ok()?;
2143        let mut filename: Option<String> = None;
2144        let mut part_ct = String::from("application/octet-stream");
2145        let mut has_file = false;
2146        for line in headers_str.split("\r\n") {
2147            let lower = line.to_ascii_lowercase();
2148            if let Some(rest) = lower.strip_prefix("content-disposition:") {
2149                if rest.contains("filename=") {
2150                    has_file = true;
2151                    // Extract filename="xxx"
2152                    if let Some(start) = line.find("filename=\"") {
2153                        let from = start + 10;
2154                        if let Some(end_offset) = line[from..].find('"') {
2155                            filename = Some(line[from..from + end_offset].to_string());
2156                        }
2157                    }
2158                }
2159            } else if let Some(rest) = lower.strip_prefix("content-type:") {
2160                part_ct = rest.trim().to_string();
2161            }
2162        }
2163
2164        if has_file {
2165            let name = filename.unwrap_or_else(|| "upload".into());
2166            return Some((name, part_ct, body[data_start..data_end].to_vec()));
2167        }
2168
2169        pos = data_end;
2170    }
2171    None
2172}
2173
2174fn find_subslice(haystack: &[u8], needle: &[u8]) -> Option<usize> {
2175    if needle.is_empty() || needle.len() > haystack.len() {
2176        return None;
2177    }
2178    haystack.windows(needle.len()).position(|w| w == needle)
2179}
2180
2181fn skip_crlf(buf: &[u8]) -> usize {
2182    if buf.len() >= 2 && &buf[0..2] == b"\r\n" {
2183        2
2184    } else if !buf.is_empty() && buf[0] == b'\n' {
2185        1
2186    } else {
2187        0
2188    }
2189}
2190
2191#[cfg(test)]
2192mod multipart_tests {
2193    use super::*;
2194
2195    #[test]
2196    fn parses_single_file() {
2197        let body = b"--bnd\r\n\
2198Content-Disposition: form-data; name=\"file\"; filename=\"hello.txt\"\r\n\
2199Content-Type: text/plain\r\n\
2200\r\n\
2201Hello world\r\n\
2202--bnd--\r\n";
2203        let ct = "multipart/form-data; boundary=bnd";
2204        let (name, content_type, bytes) = parse_multipart_first_file(body, ct).unwrap();
2205        assert_eq!(name, "hello.txt");
2206        assert_eq!(content_type, "text/plain");
2207        assert_eq!(bytes, b"Hello world");
2208    }
2209
2210    #[test]
2211    fn returns_none_without_file_part() {
2212        let body = b"--bnd\r\n\
2213Content-Disposition: form-data; name=\"field\"\r\n\
2214\r\n\
2215just text\r\n\
2216--bnd--\r\n";
2217        let ct = "multipart/form-data; boundary=bnd";
2218        assert!(parse_multipart_first_file(body, ct).is_none());
2219    }
2220
2221    #[test]
2222    fn returns_none_when_no_boundary() {
2223        assert!(parse_multipart_first_file(b"anything", "application/json").is_none());
2224    }
2225}