Skip to main content

pylon_runtime/
server.rs

1#[allow(unused_imports)]
2use std::io::Read;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5use std::time::Instant;
6
7use pylon_auth::SessionStore;
8use pylon_http::HttpMethod;
9use pylon_plugin::PluginRegistry;
10use pylon_policy::PolicyEngine;
11use pylon_sync::{ChangeKind, ChangeLog};
12use tiny_http::{Header, Method, Response, Server};
13
14use crate::datastore::{
15    CacheAdapter, EmailAdapter, LocalFileOps, PluginHooksAdapter, PubSubAdapter,
16    RuntimeOpenApiGenerator, ShardOpsAdapter, WsSseNotifier,
17};
18use crate::jobs::{JobQueue, JobResult, Worker};
19use crate::metrics::Metrics;
20use crate::pubsub::PubSubBroker;
21use crate::rate_limit::RateLimiter;
22use crate::rooms::RoomManager;
23use crate::scheduler::Scheduler;
24use crate::sse::SseHub;
25use crate::workflows::WorkflowEngine;
26use crate::ws::WsHub;
27use crate::Runtime;
28use pylon_plugin::builtin::ai_proxy::{AiMessage, AiProxyPlugin};
29use pylon_plugin::builtin::cache::CachePlugin;
30
31// ---------------------------------------------------------------------------
32// Streaming body — bridges mpsc::Receiver to std::io::Read for SSE responses
33// ---------------------------------------------------------------------------
34
35/// A streaming response body backed by an MPSC channel.
36///
37/// When used as the body of a `tiny_http::Response`, it causes the server to
38/// write data as it arrives through the channel. Dropping the sender closes
39/// the stream (EOF).
40struct StreamingBody {
41    rx: std::sync::mpsc::Receiver<Vec<u8>>,
42    buf: Vec<u8>,
43    pos: usize,
44}
45
46impl StreamingBody {
47    fn new(rx: std::sync::mpsc::Receiver<Vec<u8>>) -> Self {
48        Self {
49            rx,
50            buf: Vec::new(),
51            pos: 0,
52        }
53    }
54}
55
56impl std::io::Read for StreamingBody {
57    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
58        // Drain any leftover data from a previous recv that was larger than
59        // the caller's buffer.
60        if self.pos < self.buf.len() {
61            let remaining = &self.buf[self.pos..];
62            let n = remaining.len().min(buf.len());
63            buf[..n].copy_from_slice(&remaining[..n]);
64            self.pos += n;
65            if self.pos >= self.buf.len() {
66                self.buf.clear();
67                self.pos = 0;
68            }
69            return Ok(n);
70        }
71
72        // Block until the next chunk arrives or the sender is dropped.
73        match self.rx.recv() {
74            Ok(data) if data.is_empty() => Ok(0),
75            Ok(data) => {
76                let n = data.len().min(buf.len());
77                buf[..n].copy_from_slice(&data[..n]);
78                if n < data.len() {
79                    self.buf = data;
80                    self.pos = n;
81                }
82                Ok(n)
83            }
84            Err(_) => Ok(0), // Channel closed = EOF
85        }
86    }
87}
88
89/// Global shutdown flag. Set via `request_shutdown()` to trigger graceful exit.
90static SHUTDOWN: AtomicBool = AtomicBool::new(false);
91
92/// Request a graceful shutdown of the running server.
93///
94/// This sets the shutdown flag and, if a server handle has been stashed, calls
95/// `unblock()` to wake the request loop. Safe to call from any thread or signal
96/// handler.
97pub fn request_shutdown() {
98    SHUTDOWN.store(true, Ordering::SeqCst);
99    // If a server handle is available, unblock the request loop so it can
100    // observe the flag immediately rather than waiting for the next request.
101    if let Some(srv) = SERVER_HANDLE.get() {
102        srv.unblock();
103    }
104}
105
106/// Global handle to the `tiny_http::Server` so `request_shutdown()` can call
107/// `unblock()` without requiring callers to hold a reference.
108static SERVER_HANDLE: std::sync::OnceLock<Arc<Server>> = std::sync::OnceLock::new();
109
110// ---------------------------------------------------------------------------
111// Security headers
112// ---------------------------------------------------------------------------
113
114/// Common security headers applied to every response.
115fn security_headers() -> Vec<Header> {
116    vec![
117        Header::from_bytes("X-Content-Type-Options", "nosniff").unwrap(),
118        Header::from_bytes("X-Frame-Options", "DENY").unwrap(),
119        Header::from_bytes("X-XSS-Protection", "1; mode=block").unwrap(),
120    ]
121}
122
123/// Add security headers to a response.
124fn with_security_headers<R: std::io::Read>(response: Response<R>) -> Response<R> {
125    let mut resp = response;
126    for header in security_headers() {
127        resp = resp.with_header(header);
128    }
129    resp
130}
131
132/// Start the dev server on the given port. Blocks until shutdown.
133pub fn start(runtime: Arc<Runtime>, port: u16) -> Result<(), String> {
134    start_with_plugins(runtime, port, None)
135}
136
137/// Start the dev server with optional plugins. Blocks until shutdown.
138pub fn start_with_plugins(
139    runtime: Arc<Runtime>,
140    port: u16,
141    plugins: Option<Arc<PluginRegistry>>,
142) -> Result<(), String> {
143    start_server(runtime, port, plugins, None)
144}
145
146/// Start the dev server with plugins and a shard registry for real-time
147/// simulations (games, MMO zones, etc.). Blocks until shutdown.
148pub fn start_with_shards(
149    runtime: Arc<Runtime>,
150    port: u16,
151    plugins: Option<Arc<PluginRegistry>>,
152    shard_registry: Arc<dyn pylon_realtime::DynShardRegistry>,
153) -> Result<(), String> {
154    start_server(runtime, port, plugins, Some(shard_registry))
155}
156
157fn start_server(
158    runtime: Arc<Runtime>,
159    port: u16,
160    plugins: Option<Arc<PluginRegistry>>,
161    shard_registry: Option<Arc<dyn pylon_realtime::DynShardRegistry>>,
162) -> Result<(), String> {
163    // Run the tracing-exporter hook BEFORE anything else emits spans. The
164    // operator registers it via `pylon_observability::set_tracing_hook`
165    // at process init; here we invoke it exactly once on startup. No-op
166    // if nothing was registered.
167    pylon_observability::run_tracing_hook();
168
169    let addr = format!("0.0.0.0:{port}");
170    let server = Server::http(&addr).map_err(|e| format!("Failed to start server: {e}"))?;
171    let server = Arc::new(server);
172
173    // Stash a handle so `request_shutdown()` can unblock the loop.
174    let _ = SERVER_HANDLE.set(Arc::clone(&server));
175
176    let session_store = Arc::new(build_session_store(runtime.db_path().as_deref()));
177    let magic_codes = Arc::new(pylon_auth::MagicCodeStore::new());
178    // Persist OAuth state if a session DB is configured. Reuse the same path
179    // (sessions and OAuth state are both small auth artifacts that should
180    // outlive a restart). Falls back to in-memory if no DB path is set.
181    let oauth_state = Arc::new(match std::env::var("PYLON_SESSION_DB").ok() {
182        Some(path) => match crate::oauth_backend::SqliteOAuthBackend::open(&path) {
183            Ok(backend) => pylon_auth::OAuthStateStore::with_backend(Box::new(backend)),
184            Err(e) => {
185                tracing::warn!(
186                    "[auth] OAuth state SQLite backend unavailable: {e} — falling back to in-memory"
187                );
188                pylon_auth::OAuthStateStore::new()
189            }
190        },
191        None => pylon_auth::OAuthStateStore::new(),
192    });
193    let policy_engine = Arc::new(PolicyEngine::from_manifest(runtime.manifest()));
194    let change_log = Arc::new(ChangeLog::new());
195
196    // Seed the change log with one synthetic insert per extant row so that
197    // a pull from seq=0 after a restart reconstructs current state. The
198    // change log is in-memory — restarting the process without this would
199    // leave SQLite rows unreachable via /api/sync/pull (clients would
200    // pull nothing and see an empty replica). Seqs here are fresh; clients
201    // whose cursors are ahead of `self.seq` get a 410 and full resync,
202    // which then hits this seeded log and gets every current row back.
203    for entity in runtime.manifest().entities.iter() {
204        match runtime.list(&entity.name) {
205            Ok(rows) => {
206                for row in rows {
207                    if let Some(id) = row.get("id").and_then(|v| v.as_str()) {
208                        change_log.append(&entity.name, id, ChangeKind::Insert, Some(row.clone()));
209                    }
210                }
211            }
212            Err(_) => {
213                // Entity table may not exist yet on first boot — skip.
214            }
215        }
216    }
217    let ws_hub = WsHub::new();
218    let sse_hub = SseHub::new();
219    // Default-register the rate-limit plugin when no custom registry was
220    // supplied. Without this, self-hosted deployments would launch with
221    // auth endpoints (/api/auth/magic/send, /api/auth/magic/verify,
222    // /api/auth/session) wide open to brute force and enumeration.
223    //
224    // Dev: 100k/min so a React app's initial bundle + auth + sync pulls
225    // (each worth ~6-10 requests) doesn't immediately 429 the dev. Prod:
226    // 100/min per IP — tight enough to crush burst attackers, loose
227    // enough for legitimate multi-tab UIs. Callers passing their own
228    // registry are responsible for their own limits.
229    // Probe dev mode NOW — defined for real at line ~300 but plugin
230    // registration below needs it. Same env-var, same logic.
231    let is_dev_early = std::env::var("PYLON_DEV_MODE")
232        .map(|v| v == "1" || v == "true")
233        .unwrap_or(true);
234    let plugin_rl_max: u32 = if is_dev_early { 100_000 } else { 100 };
235    let plugin_reg: Arc<PluginRegistry> = plugins.unwrap_or_else(|| {
236        let mut reg = PluginRegistry::new(runtime.manifest().clone());
237        reg.register(Arc::new(
238            pylon_plugin::builtin::rate_limit::RateLimitPlugin::new(
239                plugin_rl_max,
240                std::time::Duration::from_secs(60),
241            ),
242        ));
243        // Auto-scope any entity that declares a `tenantId` field. This is
244        // how multi-tenant isolation becomes a default posture rather than
245        // an opt-in: drop the field on the entity and the plugin takes it
246        // from there (stamps inserts, rejects cross-tenant writes).
247        reg.register(Arc::new(
248            pylon_plugin::builtin::tenant_scope::TenantScopePlugin::from_manifest(
249                runtime.manifest(),
250            ),
251        ));
252        Arc::new(reg)
253    });
254    let room_mgr = Arc::new(RoomManager::new(120)); // 2 min idle timeout
255    let ws_port = port + 1;
256    let sse_port = port + 2;
257
258    // Record server start time for the health endpoint.
259    let start_time = Instant::now();
260
261    let metrics = Arc::new(Metrics::new());
262
263    // Cache and pub/sub shared instances.
264    let cache = Arc::new(CachePlugin::new(100_000));
265    let pubsub_broker = Arc::new(PubSubBroker::new(100));
266
267    // Job queue, scheduler, and background workers.
268    let job_queue = Arc::new(JobQueue::new(1000));
269
270    // Persistent job store. Colocate with the app DB so `./app.db` gets
271    // `./app.db.jobs.db` automatically — otherwise jobs land in CWD, which
272    // is wherever the server was launched from (confusing and fragile).
273    // In-memory runtimes and the `PYLON_JOBS_IN_MEMORY=1` opt-out both
274    // skip persistence.
275    let jobs_in_memory = std::env::var("PYLON_JOBS_IN_MEMORY")
276        .map(|v| v == "1" || v == "true")
277        .unwrap_or(false);
278    if !jobs_in_memory {
279        let jobs_db_path = std::env::var("PYLON_JOBS_DB").ok().unwrap_or_else(|| {
280            runtime
281                .db_path()
282                .map(|p| format!("{p}.jobs.db"))
283                .unwrap_or_else(|| "pylon.jobs.db".into())
284        });
285        match crate::job_store::JobStore::open(&jobs_db_path) {
286            Ok(store) => {
287                let store = Arc::new(store);
288                let restored = job_queue.restore_from(&store);
289                if restored > 0 {
290                    tracing::info!("[jobs] Restored {restored} pending job(s) from {jobs_db_path}");
291                }
292                job_queue.attach_store(store);
293            }
294            Err(e) => {
295                tracing::warn!(
296                    "[jobs] Could not open job store at {jobs_db_path}: {e} — running without persistence"
297                );
298            }
299        }
300    }
301
302    // Register built-in framework jobs.
303    {
304        let cache_ref = Arc::clone(&cache);
305        job_queue.register(
306            "pylon.cache.cleanup",
307            Arc::new(move |_job| {
308                cache_ref.cleanup_expired();
309                JobResult::Success
310            }),
311        );
312        let rooms_ref = Arc::clone(&room_mgr);
313        job_queue.register(
314            "pylon.rooms.cleanup",
315            Arc::new(move |_job| {
316                rooms_ref.cleanup_idle();
317                JobResult::Success
318            }),
319        );
320    }
321
322    let scheduler = Arc::new(Scheduler::new(Arc::clone(&job_queue)));
323    // Schedule built-in tasks.
324    let _ = scheduler.schedule(
325        "pylon.cache.cleanup",
326        "*/10 * * * *",
327        Arc::new(|_| JobResult::Success),
328    );
329    let _ = scheduler.schedule(
330        "pylon.rooms.cleanup",
331        "*/5 * * * *",
332        Arc::new(|_| JobResult::Success),
333    );
334
335    // Start 2 background workers.
336    let _worker_handles: Vec<_> = (0..2)
337        .map(|i| {
338            let w = Worker::new(Arc::clone(&job_queue), &format!("worker-{i}"));
339            w.start()
340        })
341        .collect();
342
343    // Start the scheduler.
344    let _scheduler_handle = Arc::clone(&scheduler).start();
345
346    // Workflow engine: TS runner URL configurable via env, defaults to local Bun server.
347    let wf_runner_url = std::env::var("PYLON_WORKFLOW_RUNNER_URL")
348        .unwrap_or_else(|_| "http://127.0.0.1:9876/run".to_string());
349    let workflow_engine = Arc::new(WorkflowEngine::new(&wf_runner_url, 10_000));
350
351    // Rate limiter: per-IP outer cap on total requests.
352    //
353    // Defaults:
354    //   - Dev mode: effectively off (100k/min) so a React app's initial
355    //     bundle load + sync pulls + user clicks don't immediately 429.
356    //     100/min blew through during a single login + first sync pull.
357    //   - Prod: 600/min (10 req/sec average). Still tight, but a real app
358    //     should override with PYLON_RATE_LIMIT_MAX anyway.
359    //
360    // Override with PYLON_RATE_LIMIT_MAX + PYLON_RATE_LIMIT_WINDOW.
361    let default_rl_max = if is_dev_early { 100_000 } else { 600 };
362    let rl_max: u32 = std::env::var("PYLON_RATE_LIMIT_MAX")
363        .ok()
364        .and_then(|v| v.parse().ok())
365        .unwrap_or(default_rl_max);
366    let rl_window: u64 = std::env::var("PYLON_RATE_LIMIT_WINDOW")
367        .ok()
368        .and_then(|v| v.parse().ok())
369        .unwrap_or(60);
370    let rate_limiter = Arc::new(RateLimiter::new(rl_max, rl_window));
371
372    // Per-function rate limiter: separate bucket per (caller, function) pair.
373    // Defaults to a stricter cap because functions are heavier than reads.
374    // Override via PYLON_FN_RATE_LIMIT_MAX / PYLON_FN_RATE_LIMIT_WINDOW.
375    let fn_rl_max: u32 = std::env::var("PYLON_FN_RATE_LIMIT_MAX")
376        .ok()
377        .and_then(|v| v.parse().ok())
378        .unwrap_or(30);
379    let fn_rl_window: u64 = std::env::var("PYLON_FN_RATE_LIMIT_WINDOW")
380        .ok()
381        .and_then(|v| v.parse().ok())
382        .unwrap_or(60);
383    let fn_rate_limiter = Arc::new(RateLimiter::new(fn_rl_max, fn_rl_window));
384
385    // TypeScript function runtime: optional Bun process that loads functions/*.ts
386    // If no `functions/` directory exists or Bun isn't installed, this is None
387    // and /api/fn/* routes return 503.
388    // Build a notifier adapter so function mutations (`ctx.db.insert/update/delete`)
389    // emit change events to WS + SSE subscribers on COMMIT. Without this,
390    // functions write to the DB but sync clients never see the update
391    // live — they only catch up on the next refetch.
392    let fn_notifier: Arc<dyn pylon_router::ChangeNotifier> =
393        Arc::new(crate::datastore::WsSseNotifier {
394            ws: Arc::clone(&ws_hub),
395            sse: Arc::clone(&sse_hub),
396        });
397    let fn_ops_maybe = crate::datastore::try_spawn_functions(
398        Arc::clone(&runtime),
399        Arc::clone(&job_queue),
400        Arc::clone(&fn_rate_limiter),
401        Arc::clone(&change_log),
402        fn_notifier,
403    );
404
405    // Dev mode flag: when false, sensitive data (e.g. magic codes) is omitted from responses.
406    let is_dev = std::env::var("PYLON_DEV_MODE")
407        .map(|v| v == "1" || v == "true")
408        .unwrap_or(true);
409
410    // CORS origin. Defaults to `*` in dev for convenience; in prod we refuse
411    // to start with a wildcard because the server sends `Access-Control-
412    // Allow-Credentials: true` elsewhere and also accepts `Authorization:
413    // Bearer <session>`. The combination of `*` + credentials is a spec
414    // violation that some browsers tolerate, and even when they don't it
415    // lets any origin drive bearer-auth APIs.
416    let cors_origin = match std::env::var("PYLON_CORS_ORIGIN") {
417        Ok(v) => v,
418        Err(_) if is_dev => "*".to_string(),
419        Err(_) => {
420            return Err(
421                "PYLON_CORS_ORIGIN must be set in production (non-dev mode). \
422                Set it to your frontend's origin, or set PYLON_DEV_MODE=true \
423                for local development."
424                    .into(),
425            );
426        }
427    };
428    if !is_dev && cors_origin == "*" {
429        return Err("PYLON_CORS_ORIGIN=\"*\" is refused in production mode. \
430            Set it to an explicit origin (https://app.example.com)."
431            .into());
432    }
433    // Validate the origin once so per-request header construction can never
434    // panic on bad bytes. Previously every `Header::from_bytes(...).unwrap()`
435    // was a potential request-triggered DoS via env misconfiguration.
436    if Header::from_bytes(
437        "Access-Control-Allow-Origin",
438        cors_origin.as_bytes().to_vec(),
439    )
440    .is_err()
441    {
442        return Err(format!(
443            "PYLON_CORS_ORIGIN={cors_origin:?} contains bytes that are not a valid HTTP header value"
444        ));
445    }
446
447    // Admin token: read once at startup, not per-request.
448    let admin_token: Option<String> = std::env::var("PYLON_ADMIN_TOKEN").ok();
449
450    // CSRF protection. Enforced inline at the HTTP layer because the plugin
451    // trait's `on_request` hook doesn't see request headers. For
452    // state-changing methods (POST/PATCH/PUT/DELETE) we check Origin, then
453    // Referer, against the allowlist.
454    //
455    // Allowlist resolution:
456    //   - PYLON_CSRF_ORIGINS (comma-separated) if set
457    //   - otherwise PYLON_CORS_ORIGIN (already validated above)
458    //   - in dev, fall back to allow-any to avoid breaking local tooling.
459    let csrf_origins: Vec<String> = match std::env::var("PYLON_CSRF_ORIGINS") {
460        Ok(v) => v
461            .split(',')
462            .map(|s| s.trim().to_string())
463            .filter(|s| !s.is_empty())
464            .collect(),
465        Err(_) => {
466            if is_dev {
467                vec!["*".to_string()]
468            } else if cors_origin != "*" {
469                vec![cors_origin.clone()]
470            } else {
471                // Non-dev + wildcard was already rejected, but guard anyway.
472                vec![]
473            }
474        }
475    };
476    let csrf = Arc::new(pylon_plugin::builtin::csrf::CsrfPlugin::new(csrf_origins));
477
478    // Start WebSocket server on port+1.
479    //
480    // The snapshot fetcher gives the WS reader a way to ship the current
481    // CRDT snapshot to a client the instant it subscribes — without it
482    // the new tab would have to wait for the next write before catching
483    // up to the converged state. Encodes into the same length-prefixed
484    // wire frame as the broadcast path so the client decoder is shared.
485    //
486    // Authz is enforced HERE, not at the WS layer: the closure runs the
487    // row through `check_entity_read` against the caller's auth ctx and
488    // returns None on deny. The caller (handle_crdt_control) treats
489    // None as "don't subscribe" — so a denied client can't sit on a
490    // subscription waiting for a future write to leak state.
491    {
492        let hub = Arc::clone(&ws_hub);
493        let sessions = Arc::clone(&session_store);
494        let runtime_for_fetcher = Arc::clone(&runtime);
495        let pe_for_fetcher = Arc::clone(&policy_engine);
496        let fetcher: crate::ws::SnapshotFetcher = Arc::new(move |auth_ctx, entity, row_id| {
497            use pylon_http::DataStore;
498            // Fetch the row first so the policy engine can evaluate
499            // row-level predicates (`data.authorId == auth.userId`
500            // etc). Missing row → deny silently; the client just
501            // never gets a frame and can't probe existence.
502            let row = match runtime_for_fetcher.get_by_id(entity, row_id) {
503                Ok(Some(v)) => v,
504                _ => return None,
505            };
506            if !matches!(
507                pe_for_fetcher.check_entity_read(entity, auth_ctx, Some(&row)),
508                pylon_policy::PolicyResult::Allowed
509            ) {
510                return None;
511            }
512            let snap = match runtime_for_fetcher.crdt_snapshot(entity, row_id) {
513                Ok(Some(bytes)) => bytes,
514                _ => return None,
515            };
516            pylon_router::encode_crdt_frame(
517                pylon_router::CRDT_FRAME_SNAPSHOT,
518                entity,
519                row_id,
520                &snap,
521            )
522            .ok()
523        });
524        std::thread::spawn(move || {
525            crate::ws::start_ws_server(hub, sessions, ws_port, Some(fetcher));
526        });
527    }
528
529    // Start SSE server on port+2.
530    {
531        let hub = Arc::clone(&sse_hub);
532        std::thread::spawn(move || {
533            crate::sse::start_sse_server(hub, sse_port);
534        });
535    }
536
537    // Start shard WebSocket server on port+3 when a registry is provided.
538    let shard_ws_port = port + 3;
539    if let Some(reg) = shard_registry.clone() {
540        let sessions = Arc::clone(&session_store);
541        std::thread::spawn(move || {
542            crate::shard_ws::start_shard_ws_server(reg, sessions, shard_ws_port);
543        });
544    }
545
546    tracing::warn!("pylon dev server listening on http://localhost:{port}");
547    tracing::info!("  WebSocket: ws://localhost:{ws_port}");
548    tracing::info!("  Studio: http://localhost:{port}/studio");
549    tracing::info!("  API:    http://localhost:{port}/api/entities/<entity>");
550    tracing::info!("  Auth:   http://localhost:{port}/api/auth/session");
551
552    // Use recv() in a loop instead of incoming_requests() so we can share
553    // the Arc<Server> with the shutdown path (incoming_requests borrows &self
554    // which prevents moving the Arc into another thread).
555    loop {
556        if SHUTDOWN.load(Ordering::Relaxed) {
557            break;
558        }
559
560        let mut request = match server.recv() {
561            Ok(rq) => rq,
562            Err(_) => {
563                // recv() returns Err when unblocked or the socket is closed.
564                break;
565            }
566        };
567
568        if SHUTDOWN.load(Ordering::Relaxed) {
569            break;
570        }
571
572        let rt = Arc::clone(&runtime);
573        let ss = Arc::clone(&session_store);
574        let pe = Arc::clone(&policy_engine);
575        let cl = Arc::clone(&change_log);
576        let wh = Arc::clone(&ws_hub);
577        let sh = Arc::clone(&sse_hub);
578        let mc = Arc::clone(&magic_codes);
579        let pr = Arc::clone(&plugin_reg);
580        let rm = Arc::clone(&room_mgr);
581        let mt = Arc::clone(&metrics);
582        let os = Arc::clone(&oauth_state);
583        let ca = Arc::clone(&cache);
584        let ps = Arc::clone(&pubsub_broker);
585        let jq = Arc::clone(&job_queue);
586        let sc = Arc::clone(&scheduler);
587        let we = Arc::clone(&workflow_engine);
588        let fn_ops_ref = fn_ops_maybe.clone();
589        let shards_ref = shard_registry.clone();
590        let cors_origin = cors_origin.clone();
591        let is_dev = is_dev;
592
593        let method = request.method().clone();
594        let url = request.url().to_string();
595
596        // --- Health check: fast path before auth or body parsing ---
597        if url == "/health" && method == Method::Get {
598            let uptime = start_time.elapsed().as_secs();
599            let body = serde_json::json!({
600                "status": "ok",
601                "version": "0.1.0",
602                "uptime_secs": uptime,
603            })
604            .to_string();
605
606            let response = with_security_headers(
607                Response::from_string(&body)
608                    .with_status_code(200u16)
609                    .with_header(Header::from_bytes("Content-Type", "application/json").unwrap())
610                    .with_header(
611                        Header::from_bytes(
612                            "Access-Control-Allow-Origin",
613                            cors_origin.as_bytes().to_vec(),
614                        )
615                        .unwrap(),
616                    ),
617            );
618            let _ = request.respond(response);
619            continue;
620        }
621
622        // --- Metrics endpoint: fast path before rate-limit / body parsing.
623        // Gate behind admin auth in non-dev to prevent leakage of function
624        // names, request volumes, and error rates to the public internet.
625        // Dev mode stays open so local Prometheus scrapers just work.
626        if url == "/metrics" && method == Method::Get {
627            if !is_dev {
628                let admin_bytes = admin_token.as_deref().unwrap_or("").as_bytes();
629                let auth_ok = !admin_bytes.is_empty()
630                    && request.headers().iter().any(|h| {
631                        let name = h.field.as_str().as_str();
632                        name.eq_ignore_ascii_case("Authorization")
633                            && h.value
634                                .as_str()
635                                .strip_prefix("Bearer ")
636                                .map(|t| pylon_auth::constant_time_eq(t.as_bytes(), admin_bytes))
637                                .unwrap_or(false)
638                    });
639                if !auth_ok {
640                    let body = json_error(
641                        "UNAUTHORIZED",
642                        "/metrics requires admin bearer token in non-dev mode",
643                    );
644                    let response = with_security_headers(
645                        Response::from_string(&body)
646                            .with_status_code(401u16)
647                            .with_header(
648                                Header::from_bytes("Content-Type", "application/json").unwrap(),
649                            ),
650                    );
651                    let _ = request.respond(response);
652                    continue;
653                }
654            }
655            let prefers_prometheus = request.headers().iter().any(|h| {
656                (h.field.as_str() == "Accept" || h.field.as_str() == "accept")
657                    && (h.value.as_str().contains("text/plain")
658                        || h.value.as_str().contains("application/openmetrics-text"))
659            });
660            let (body, content_type) = if prefers_prometheus {
661                (mt.prometheus(), "text/plain; version=0.0.4")
662            } else {
663                (mt.snapshot().to_string(), "application/json")
664            };
665            let response = with_security_headers(
666                Response::from_string(&body)
667                    .with_status_code(200u16)
668                    .with_header(Header::from_bytes("Content-Type", content_type).unwrap())
669                    .with_header(
670                        Header::from_bytes(
671                            "Access-Control-Allow-Origin",
672                            cors_origin.as_bytes().to_vec(),
673                        )
674                        .unwrap(),
675                    ),
676            );
677            let _ = request.respond(response);
678            mt.record_request("GET", 200);
679            continue;
680        }
681
682        // --- Rate limiting: check per-IP request count ---
683        let peer_ip = request
684            .remote_addr()
685            .map(|a| a.ip().to_string())
686            .unwrap_or_default();
687
688        // OPTIONS preflights are browser infrastructure, not user intent.
689        // Rate-limiting them makes a normal page effectively halve its
690        // budget (preflight + real request per call) and returns a 429
691        // that the browser can't interpret as a valid CORS response —
692        // the user-visible symptom is "Failed to fetch" on login. Skip.
693        let is_preflight = matches!(method, Method::Options);
694        if !is_preflight {
695            if let Err(retry_after) = rate_limiter.check(&peer_ip) {
696                let err_body = json_error(
697                    "RATE_LIMITED",
698                    &format!("Too many requests. Retry after {retry_after} seconds."),
699                );
700                let response = with_security_headers(
701                    Response::from_string(&err_body)
702                        .with_status_code(429u16)
703                        .with_header(
704                            Header::from_bytes("Content-Type", "application/json").unwrap(),
705                        )
706                        .with_header(
707                            Header::from_bytes(
708                                "Access-Control-Allow-Origin",
709                                cors_origin.as_bytes().to_vec(),
710                            )
711                            .unwrap(),
712                        )
713                        .with_header(
714                            Header::from_bytes(
715                                "Access-Control-Allow-Methods",
716                                "GET, POST, PATCH, DELETE, OPTIONS",
717                            )
718                            .unwrap(),
719                        )
720                        .with_header(
721                            Header::from_bytes(
722                                "Access-Control-Allow-Headers",
723                                "Content-Type, Authorization",
724                            )
725                            .unwrap(),
726                        )
727                        .with_header(
728                            Header::from_bytes(
729                                "Retry-After",
730                                retry_after.to_string().as_bytes().to_vec(),
731                            )
732                            .unwrap(),
733                        ),
734                );
735                let _ = request.respond(response);
736                mt.record_request(method.as_str(), 429);
737                continue;
738            }
739        } // end: if !is_preflight
740
741        // --- CSRF check on state-changing requests ---
742        //
743        // Browsers forbid cross-origin POST/PATCH/PUT/DELETE unless CORS
744        // allows it, but an attacker controlling another origin can still
745        // ship credentials-bearing requests if the server is permissive.
746        // The CSRF plugin validates Origin (then Referer) against an explicit
747        // allowlist — this is the check that was missing because the Plugin
748        // trait's `on_request` hook has no access to headers.
749        //
750        // The Authorization header carries bearer tokens, so CSRF mostly
751        // matters for cookie-based sessions — but we enforce globally: a
752        // request that misses Origin/Referer on a state-changing method is
753        // rejected, which is the safer default.
754        {
755            let method_str = method.as_str();
756            let is_bearer = request.headers().iter().any(|h| {
757                (h.field.as_str() == "Authorization" || h.field.as_str() == "authorization")
758                    && h.value.as_str().starts_with("Bearer ")
759            });
760            // Bearer-authenticated requests are not CSRF-vulnerable in the
761            // classic sense — browsers don't auto-attach bearer tokens. Skip
762            // the check for them so server-to-server API callers keep working
763            // without needing Origin headers.
764            if !is_bearer && !matches!(method, Method::Get | Method::Head | Method::Options) {
765                let origin = request
766                    .headers()
767                    .iter()
768                    .find(|h| h.field.as_str() == "Origin" || h.field.as_str() == "origin")
769                    .map(|h| h.value.as_str().to_string());
770                let referer = request
771                    .headers()
772                    .iter()
773                    .find(|h| h.field.as_str() == "Referer" || h.field.as_str() == "referer")
774                    .map(|h| h.value.as_str().to_string());
775                if let Err(err) = csrf.check(method_str, origin.as_deref(), referer.as_deref()) {
776                    let body = json_error(&err.code, &err.message);
777                    let response = with_security_headers(
778                        Response::from_string(&body)
779                            .with_status_code(err.status)
780                            .with_header(
781                                Header::from_bytes("Content-Type", "application/json").unwrap(),
782                            )
783                            .with_header(
784                                Header::from_bytes(
785                                    "Access-Control-Allow-Origin",
786                                    cors_origin.as_bytes().to_vec(),
787                                )
788                                .unwrap(),
789                            ),
790                    );
791                    let _ = request.respond(response);
792                    mt.record_request(method_str, err.status);
793                    continue;
794                }
795            }
796        }
797
798        // Extract auth token + auth context EARLY so every fast path (upload,
799        // shard SSE, fn streaming, AI streaming) can enforce auth the same
800        // way the router does. Previously these paths ran before auth
801        // extraction and bypassed the plugin/router auth chain entirely.
802        let auth_token: Option<String> = request
803            .headers()
804            .iter()
805            .find(|h| h.field.as_str() == "Authorization" || h.field.as_str() == "authorization")
806            .and_then(|h| {
807                let val = h.value.as_str();
808                val.strip_prefix("Bearer ").map(|t| t.to_string())
809            });
810        let auth_ctx = if admin_token.is_some()
811            && auth_token.is_some()
812            && pylon_auth::constant_time_eq(
813                auth_token.as_deref().unwrap_or("").as_bytes(),
814                admin_token.as_deref().unwrap_or("").as_bytes(),
815            ) {
816            pylon_auth::AuthContext::admin()
817        } else {
818            ss.resolve(auth_token.as_deref())
819        };
820
821        // --- Test-reset endpoint — in-memory + dev mode + localhost only ---
822        //
823        // `pylon test` sets PYLON_IN_MEMORY=1 + PYLON_DEV_MODE=true.
824        // The TS helper `resetDb()` posts here between `test(...)` blocks
825        // to isolate cases. Gates:
826        //   1. dev mode (production refuses outright)
827        //   2. in-memory DB (belt-and-braces against accidental file wipes)
828        //   3. peer IP is loopback (a dev laptop often has localhost:4321
829        //      reachable; without this, a browser visiting a malicious
830        //      site could cross-site-POST a reset via a bare form —
831        //      blind CSRF that doesn't care about the response)
832        //
833        // Positioned AFTER the rate limiter and CSRF check on purpose so
834        // those middlewares apply — the earlier placement skipped both.
835        if url == "/api/__test__/reset" && method == Method::Post {
836            let is_loopback = peer_ip == "127.0.0.1"
837                || peer_ip == "::1"
838                || peer_ip.starts_with("127.")
839                || peer_ip == "localhost";
840            if !is_dev || !rt.is_in_memory() || !is_loopback {
841                let body = json_error(
842                    "RESET_REFUSED",
843                    "reset endpoint is only available in dev mode + in-memory DB + from loopback",
844                );
845                let response = with_security_headers(
846                    Response::from_string(&body)
847                        .with_status_code(403u16)
848                        .with_header(
849                            Header::from_bytes("Content-Type", "application/json").unwrap(),
850                        )
851                        .with_header(
852                            Header::from_bytes(
853                                "Access-Control-Allow-Origin",
854                                cors_origin.as_bytes().to_vec(),
855                            )
856                            .unwrap(),
857                        ),
858                );
859                let _ = request.respond(response);
860                mt.record_request("POST", 403);
861                continue;
862            }
863            let (status, body) = match rt.reset_for_tests() {
864                Ok(()) => (200u16, "{\"reset\":true}".to_string()),
865                Err(e) => (500u16, json_error(&e.code, &e.message)),
866            };
867            let response = with_security_headers(
868                Response::from_string(&body)
869                    .with_status_code(status)
870                    .with_header(Header::from_bytes("Content-Type", "application/json").unwrap())
871                    .with_header(
872                        Header::from_bytes(
873                            "Access-Control-Allow-Origin",
874                            cors_origin.as_bytes().to_vec(),
875                        )
876                        .unwrap(),
877                    ),
878            );
879            let _ = request.respond(response);
880            mt.record_request("POST", status);
881            continue;
882        }
883
884        // --- File upload fast path: handle binary body before string conversion ---
885        // Uploads come in two shapes:
886        //   1. Direct binary body with X-Filename / Content-Type headers
887        //   2. multipart/form-data with a file part
888        //
889        // Require an authenticated user. Uploads write to the files backend
890        // (and into the plugin audit log for soft-delete etc.), so
891        // unauthenticated callers cannot use this route.
892        if url == "/api/files/upload" && method == Method::Post {
893            const UPLOAD_MAX: usize = 10 * 1024 * 1024;
894            // Enforce size BEFORE reading the body so a 10 GiB stream can't
895            // buffer into memory. Content-Length pre-check, then bounded read.
896            if let Some(declared) = request.body_length() {
897                if declared > UPLOAD_MAX {
898                    let err = json_error(
899                        "PAYLOAD_TOO_LARGE",
900                        &format!("Content-Length {declared} exceeds upload max of {UPLOAD_MAX}"),
901                    );
902                    let response = with_security_headers(
903                        Response::from_string(&err)
904                            .with_status_code(413u16)
905                            .with_header(
906                                Header::from_bytes("Content-Type", "application/json").unwrap(),
907                            )
908                            .with_header(
909                                Header::from_bytes(
910                                    "Access-Control-Allow-Origin",
911                                    cors_origin.as_bytes().to_vec(),
912                                )
913                                .unwrap(),
914                            ),
915                    );
916                    let _ = request.respond(response);
917                    mt.record_request("POST", 413);
918                    continue;
919                }
920            }
921            if auth_ctx.user_id.is_none() {
922                let err = json_error(
923                    "AUTH_REQUIRED",
924                    "/api/files/upload requires an authenticated session",
925                );
926                let response = with_security_headers(
927                    Response::from_string(&err)
928                        .with_status_code(401u16)
929                        .with_header(
930                            Header::from_bytes("Content-Type", "application/json").unwrap(),
931                        )
932                        .with_header(
933                            Header::from_bytes(
934                                "Access-Control-Allow-Origin",
935                                cors_origin.as_bytes().to_vec(),
936                            )
937                            .unwrap(),
938                        ),
939                );
940                let _ = request.respond(response);
941                mt.record_request("POST", 401);
942                continue;
943            }
944            // Read up to UPLOAD_MAX + 1 bytes. If we read the full +1 we know
945            // the client lied about Content-Length (or used chunked encoding
946            // and overran). Reject in that case instead of continuing with a
947            // truncated file.
948            use std::io::Read;
949            let mut bytes: Vec<u8> = Vec::with_capacity(8192);
950            let mut limited = request.as_reader().take((UPLOAD_MAX as u64) + 1);
951            let _ = limited.read_to_end(&mut bytes);
952
953            const MAX: usize = UPLOAD_MAX;
954            if bytes.len() > MAX {
955                let err = json_error("PAYLOAD_TOO_LARGE", "File exceeds 10 MB limit");
956                let response = with_security_headers(
957                    Response::from_string(&err)
958                        .with_status_code(413u16)
959                        .with_header(
960                            Header::from_bytes("Content-Type", "application/json").unwrap(),
961                        )
962                        .with_header(
963                            Header::from_bytes(
964                                "Access-Control-Allow-Origin",
965                                cors_origin.as_bytes().to_vec(),
966                            )
967                            .unwrap(),
968                        ),
969                );
970                let _ = request.respond(response);
971                mt.record_request("POST", 413);
972                continue;
973            }
974
975            // Headers.
976            let content_type = request
977                .headers()
978                .iter()
979                .find(|h| h.field.as_str() == "Content-Type" || h.field.as_str() == "content-type")
980                .map(|h| h.value.as_str().to_string())
981                .unwrap_or_else(|| "application/octet-stream".into());
982            let filename = request
983                .headers()
984                .iter()
985                .find(|h| h.field.as_str() == "X-Filename" || h.field.as_str() == "x-filename")
986                .map(|h| h.value.as_str().to_string())
987                .unwrap_or_else(|| "upload".into());
988
989            // If multipart, extract the first file part. Otherwise use bytes directly.
990            let (name, ct, payload) = if content_type.starts_with("multipart/form-data") {
991                match parse_multipart_first_file(&bytes, &content_type) {
992                    Some(p) => p,
993                    None => {
994                        let err = json_error("INVALID_MULTIPART", "Could not parse multipart body");
995                        let response = with_security_headers(
996                            Response::from_string(&err)
997                                .with_status_code(400u16)
998                                .with_header(
999                                    Header::from_bytes("Content-Type", "application/json").unwrap(),
1000                                )
1001                                .with_header(
1002                                    Header::from_bytes(
1003                                        "Access-Control-Allow-Origin",
1004                                        cors_origin.as_bytes().to_vec(),
1005                                    )
1006                                    .unwrap(),
1007                                ),
1008                        );
1009                        let _ = request.respond(response);
1010                        mt.record_request("POST", 400);
1011                        continue;
1012                    }
1013                }
1014            } else {
1015                (filename, content_type, bytes)
1016            };
1017
1018            let storage = pylon_storage::files::LocalFileStorage::new(
1019                &std::env::var("PYLON_FILES_DIR").unwrap_or_else(|_| "uploads".into()),
1020                &std::env::var("PYLON_FILES_URL_PREFIX").unwrap_or_else(|_| "/api/files".into()),
1021            );
1022
1023            let (status, body) =
1024                match pylon_storage::files::FileStorage::store(&storage, &name, &payload, &ct) {
1025                    Ok(stored) => (
1026                        201u16,
1027                        serde_json::to_string(&stored).unwrap_or_else(|_| "{}".into()),
1028                    ),
1029                    Err(e) => (500u16, json_error(&e.code, &e.message)),
1030                };
1031
1032            let response = with_security_headers(
1033                Response::from_string(&body)
1034                    .with_status_code(status)
1035                    .with_header(Header::from_bytes("Content-Type", "application/json").unwrap())
1036                    .with_header(
1037                        Header::from_bytes(
1038                            "Access-Control-Allow-Origin",
1039                            cors_origin.as_bytes().to_vec(),
1040                        )
1041                        .unwrap(),
1042                    ),
1043            );
1044            let _ = request.respond(response);
1045            mt.record_request("POST", status);
1046            continue;
1047        }
1048
1049        // Read body before routing (request is consumed by respond).
1050        // Skip for methods that cannot have a body.
1051        //
1052        // Size enforcement runs in TWO layers so a malicious client can't
1053        // stream 10 GiB into memory before we reject it:
1054        //   1. Content-Length header is compared to MAX_BODY_SIZE up front.
1055        //   2. The actual read uses `.take(MAX_BODY_SIZE + 1)` so a lying
1056        //      or chunked stream is capped at MAX + 1 bytes; if we read that
1057        //      many, we reject.
1058        const MAX_BODY_SIZE: usize = 10 * 1024 * 1024;
1059
1060        if let Some(declared) = request.body_length() {
1061            if declared > MAX_BODY_SIZE {
1062                let err_body = json_error(
1063                    "PAYLOAD_TOO_LARGE",
1064                    &format!("Content-Length {declared} exceeds max of {MAX_BODY_SIZE}"),
1065                );
1066                let response = with_security_headers(
1067                    Response::from_string(&err_body)
1068                        .with_status_code(413u16)
1069                        .with_header(
1070                            Header::from_bytes(
1071                                "Access-Control-Allow-Origin",
1072                                cors_origin.as_bytes().to_vec(),
1073                            )
1074                            .unwrap(),
1075                        ),
1076                );
1077                let _ = request.respond(response);
1078                mt.record_request(method.as_str(), 413);
1079                continue;
1080            }
1081        }
1082
1083        let mut body = String::new();
1084        if !matches!(
1085            method,
1086            Method::Get | Method::Head | Method::Options | Method::Delete
1087        ) {
1088            use std::io::Read;
1089            let mut limited = request.as_reader().take((MAX_BODY_SIZE as u64) + 1);
1090            let _ = limited.read_to_string(&mut body);
1091        }
1092
1093        if body.len() > MAX_BODY_SIZE {
1094            let err_body = json_error(
1095                "PAYLOAD_TOO_LARGE",
1096                &format!(
1097                    "Request body exceeds maximum size of {} bytes",
1098                    MAX_BODY_SIZE,
1099                ),
1100            );
1101            let response = with_security_headers(
1102                Response::from_string(&err_body)
1103                    .with_status_code(413u16)
1104                    .with_header(Header::from_bytes("Content-Type", "application/json").unwrap())
1105                    .with_header(
1106                        Header::from_bytes(
1107                            "Access-Control-Allow-Origin",
1108                            cors_origin.as_bytes().to_vec(),
1109                        )
1110                        .unwrap(),
1111                    ),
1112            );
1113            let _ = request.respond(response);
1114            mt.record_request(method.as_str(), 413);
1115            continue;
1116        }
1117
1118        // (auth_token + auth_ctx were resolved above, before the fast paths.)
1119
1120        // --- GET /api/shards/:id/connect — SSE snapshot stream ---
1121        if method == Method::Get {
1122            if let Some(rest) = url.strip_prefix("/api/shards/") {
1123                let rest = rest.split('?').next().unwrap_or(rest);
1124                if let Some(shard_id) = rest.strip_suffix("/connect") {
1125                    // Require an authenticated user. Shard SSE streams state
1126                    // snapshots tick-by-tick; an anonymous subscriber can
1127                    // both read that state AND influence via push_input (see
1128                    // the WS handler). Gate at the transport layer.
1129                    if auth_ctx.user_id.is_none() {
1130                        let err = json_error(
1131                            "AUTH_REQUIRED",
1132                            "Shard connect requires an authenticated session",
1133                        );
1134                        let response = with_security_headers(
1135                            Response::from_string(&err)
1136                                .with_status_code(401u16)
1137                                .with_header(
1138                                    Header::from_bytes("Content-Type", "application/json").unwrap(),
1139                                )
1140                                .with_header(
1141                                    Header::from_bytes(
1142                                        "Access-Control-Allow-Origin",
1143                                        cors_origin.as_bytes().to_vec(),
1144                                    )
1145                                    .unwrap(),
1146                                ),
1147                        );
1148                        let _ = request.respond(response);
1149                        mt.record_request("GET", 401);
1150                        continue;
1151                    }
1152                    let shards = match &shards_ref {
1153                        Some(s) => Arc::clone(s),
1154                        None => {
1155                            let err = json_error(
1156                                "SHARDS_NOT_AVAILABLE",
1157                                "Shard system is not configured",
1158                            );
1159                            let response = with_security_headers(
1160                                Response::from_string(&err)
1161                                    .with_status_code(503u16)
1162                                    .with_header(
1163                                        Header::from_bytes("Content-Type", "application/json")
1164                                            .unwrap(),
1165                                    )
1166                                    .with_header(
1167                                        Header::from_bytes(
1168                                            "Access-Control-Allow-Origin",
1169                                            cors_origin.as_bytes().to_vec(),
1170                                        )
1171                                        .unwrap(),
1172                                    ),
1173                            );
1174                            let _ = request.respond(response);
1175                            mt.record_request("GET", 503);
1176                            continue;
1177                        }
1178                    };
1179                    let shard = match shards.get(shard_id) {
1180                        Some(s) => s,
1181                        None => {
1182                            let err = json_error(
1183                                "SHARD_NOT_FOUND",
1184                                &format!("Shard \"{shard_id}\" not found"),
1185                            );
1186                            let response = with_security_headers(
1187                                Response::from_string(&err)
1188                                    .with_status_code(404u16)
1189                                    .with_header(
1190                                        Header::from_bytes("Content-Type", "application/json")
1191                                            .unwrap(),
1192                                    )
1193                                    .with_header(
1194                                        Header::from_bytes(
1195                                            "Access-Control-Allow-Origin",
1196                                            cors_origin.as_bytes().to_vec(),
1197                                        )
1198                                        .unwrap(),
1199                                    ),
1200                            );
1201                            let _ = request.respond(response);
1202                            mt.record_request("GET", 404);
1203                            continue;
1204                        }
1205                    };
1206
1207                    // Subscriber ID from ?sid= query param, else the authed user,
1208                    // else a generated anonymous ID.
1209                    let sub_id = url
1210                        .split("sid=")
1211                        .nth(1)
1212                        .and_then(|s| s.split('&').next())
1213                        .map(|s| s.to_string())
1214                        .or_else(|| auth_ctx.user_id.clone())
1215                        .unwrap_or_else(|| {
1216                            format!(
1217                                "anon_{}",
1218                                std::time::SystemTime::now()
1219                                    .duration_since(std::time::UNIX_EPOCH)
1220                                    .unwrap_or_default()
1221                                    .as_nanos()
1222                            )
1223                        });
1224                    let subscriber_id = pylon_realtime::SubscriberId::new(sub_id);
1225
1226                    let (tx, rx) = std::sync::mpsc::channel::<Vec<u8>>();
1227                    let streaming_body = StreamingBody::new(rx);
1228
1229                    let tx_clone = tx.clone();
1230                    let sink: pylon_realtime::SnapshotSink =
1231                        Box::new(move |tick: u64, bytes: &[u8]| {
1232                            // Format as SSE with an id: line carrying the tick
1233                            // number so clients can resume with Last-Event-ID.
1234                            let mut frame = format!("id: {tick}\ndata: ").into_bytes();
1235                            frame.extend_from_slice(bytes);
1236                            frame.extend_from_slice(b"\n\n");
1237                            let _ = tx_clone.send(frame);
1238                        });
1239
1240                    let shard_auth = pylon_realtime::ShardAuth {
1241                        user_id: auth_ctx.user_id.clone(),
1242                        is_admin: auth_ctx.is_admin,
1243                    };
1244                    if let Err(e) = shard.add_subscriber(subscriber_id.clone(), sink, &shard_auth) {
1245                        let (status, code) = match &e {
1246                            pylon_realtime::ShardError::Unauthorized(_) => (403u16, "UNAUTHORIZED"),
1247                            _ => (429u16, "SUBSCRIBE_FAILED"),
1248                        };
1249                        let err = json_error(code, &e.to_string());
1250                        let response = with_security_headers(
1251                            Response::from_string(&err)
1252                                .with_status_code(status)
1253                                .with_header(
1254                                    Header::from_bytes("Content-Type", "application/json").unwrap(),
1255                                )
1256                                .with_header(
1257                                    Header::from_bytes(
1258                                        "Access-Control-Allow-Origin",
1259                                        cors_origin.as_bytes().to_vec(),
1260                                    )
1261                                    .unwrap(),
1262                                ),
1263                        );
1264                        let _ = request.respond(response);
1265                        mt.record_request("GET", status);
1266                        continue;
1267                    }
1268
1269                    // Auto-unsubscribe when the client disconnects: we watch
1270                    // for mpsc channel disconnection in a sentinel thread.
1271                    {
1272                        let shard_cleanup = Arc::clone(&shard);
1273                        let sub_id_cleanup = subscriber_id.clone();
1274                        let tx_liveness = tx.clone();
1275                        std::thread::spawn(move || {
1276                            // Send a heartbeat every 30s; if send fails, the
1277                            // channel is closed (client disconnected).
1278                            loop {
1279                                std::thread::sleep(std::time::Duration::from_secs(30));
1280                                if tx_liveness.send(b": heartbeat\n\n".to_vec()).is_err() {
1281                                    shard_cleanup.remove_subscriber(&sub_id_cleanup);
1282                                    return;
1283                                }
1284                                if !shard_cleanup.is_running() {
1285                                    return;
1286                                }
1287                            }
1288                        });
1289                    }
1290
1291                    let response = with_security_headers(Response::new(
1292                        tiny_http::StatusCode(200),
1293                        vec![
1294                            Header::from_bytes("Content-Type", "text/event-stream").unwrap(),
1295                            Header::from_bytes("Cache-Control", "no-cache").unwrap(),
1296                            Header::from_bytes("Connection", "keep-alive").unwrap(),
1297                            Header::from_bytes(
1298                                "Access-Control-Allow-Origin",
1299                                cors_origin.as_bytes().to_vec(),
1300                            )
1301                            .unwrap(),
1302                        ],
1303                        streaming_body,
1304                        None,
1305                        None,
1306                    ));
1307                    let _ = request.respond(response);
1308                    mt.record_request("GET", 200);
1309                    continue;
1310                }
1311            }
1312        }
1313
1314        // --- POST /api/fn/:name with Accept: text/event-stream — streaming functions ---
1315        if method == Method::Post
1316            && url.starts_with("/api/fn/")
1317            && url != "/api/fn/traces"
1318            && request.headers().iter().any(|h| {
1319                (h.field.as_str() == "Accept" || h.field.as_str() == "accept")
1320                    && h.value.as_str().contains("text/event-stream")
1321            })
1322        {
1323            let fn_name = url
1324                .strip_prefix("/api/fn/")
1325                .unwrap_or("")
1326                .split('?')
1327                .next()
1328                .unwrap_or("")
1329                .to_string();
1330
1331            if let Some(fn_ops) = &fn_ops_maybe {
1332                // Mirror the router's gates so the streaming fast path doesn't
1333                // become a way to bypass function auth / rate limits.
1334                // 1. Function must exist (otherwise 404, not a hung SSE).
1335                if pylon_router::FnOps::get_fn(fn_ops.as_ref(), &fn_name).is_none() {
1336                    let err = json_error(
1337                        "FN_NOT_FOUND",
1338                        &format!("Function \"{fn_name}\" is not registered"),
1339                    );
1340                    let response = with_security_headers(
1341                        Response::from_string(&err)
1342                            .with_status_code(404u16)
1343                            .with_header(
1344                                Header::from_bytes("Content-Type", "application/json").unwrap(),
1345                            )
1346                            .with_header(
1347                                Header::from_bytes(
1348                                    "Access-Control-Allow-Origin",
1349                                    cors_origin.as_bytes().to_vec(),
1350                                )
1351                                .unwrap(),
1352                            ),
1353                    );
1354                    let _ = request.respond(response);
1355                    mt.record_request("POST", 404);
1356                    continue;
1357                }
1358                // 2. Per-function rate limit (identity = user_id or "anon").
1359                let identity = auth_ctx.user_id.as_deref().unwrap_or("anon");
1360                if let Err(retry_after) =
1361                    pylon_router::FnOps::check_rate_limit(fn_ops.as_ref(), &fn_name, identity)
1362                {
1363                    let body = format!(
1364                        r#"{{"error":{{"code":"RATE_LIMITED","message":"Function \"{fn_name}\" rate limit exceeded","retry_after_secs":{retry_after}}}}}"#
1365                    );
1366                    let response = with_security_headers(
1367                        Response::from_string(&body)
1368                            .with_status_code(429u16)
1369                            .with_header(
1370                                Header::from_bytes("Content-Type", "application/json").unwrap(),
1371                            )
1372                            .with_header(
1373                                Header::from_bytes(
1374                                    "Access-Control-Allow-Origin",
1375                                    cors_origin.as_bytes().to_vec(),
1376                                )
1377                                .unwrap(),
1378                            ),
1379                    );
1380                    let _ = request.respond(response);
1381                    mt.record_request("POST", 429);
1382                    continue;
1383                }
1384
1385                let args: serde_json::Value =
1386                    serde_json::from_str(&body).unwrap_or(serde_json::json!({}));
1387
1388                let auth = pylon_functions::protocol::AuthInfo {
1389                    user_id: auth_ctx.user_id.clone(),
1390                    is_admin: auth_ctx.is_admin,
1391                    tenant_id: auth_ctx.tenant_id.clone(),
1392                };
1393
1394                let (tx, rx) = std::sync::mpsc::channel::<Vec<u8>>();
1395                let streaming_body = StreamingBody::new(rx);
1396
1397                let fn_ops_cl = Arc::clone(fn_ops);
1398                let tx_stream = tx.clone();
1399                std::thread::spawn(move || {
1400                    let tx_cb = tx_stream.clone();
1401                    let on_stream: Box<dyn FnMut(&str) + Send> = Box::new(move |chunk: &str| {
1402                        let sse = format!("data: {}\n\n", chunk);
1403                        let _ = tx_cb.send(sse.into_bytes());
1404                    });
1405
1406                    let result = pylon_router::FnOps::call(
1407                        fn_ops_cl.as_ref(),
1408                        &fn_name,
1409                        args,
1410                        auth,
1411                        Some(on_stream),
1412                        None, // streaming /api/fn/:name never carries HTTP request metadata
1413                    );
1414                    match result {
1415                        Ok((value, _trace)) => {
1416                            let done = format!(
1417                                "event: result\ndata: {}\n\n",
1418                                serde_json::to_string(&value).unwrap_or_else(|_| "null".into())
1419                            );
1420                            let _ = tx_stream.send(done.into_bytes());
1421                        }
1422                        Err(e) => {
1423                            let err = format!(
1424                                "event: error\ndata: {}\n\n",
1425                                serde_json::json!({"code": e.code, "message": e.message})
1426                            );
1427                            let _ = tx_stream.send(err.into_bytes());
1428                        }
1429                    }
1430                });
1431
1432                let response = with_security_headers(Response::new(
1433                    tiny_http::StatusCode(200),
1434                    vec![
1435                        Header::from_bytes("Content-Type", "text/event-stream").unwrap(),
1436                        Header::from_bytes("Cache-Control", "no-cache").unwrap(),
1437                        Header::from_bytes("Connection", "keep-alive").unwrap(),
1438                        Header::from_bytes(
1439                            "Access-Control-Allow-Origin",
1440                            cors_origin.as_bytes().to_vec(),
1441                        )
1442                        .unwrap(),
1443                    ],
1444                    streaming_body,
1445                    None,
1446                    None,
1447                ));
1448                let _ = request.respond(response);
1449                mt.record_request("POST", 200);
1450                continue;
1451            }
1452        }
1453
1454        // --- POST /api/ai/stream — SSE streaming AI completion ---
1455        if url == "/api/ai/stream" && method == Method::Post {
1456            // AI endpoints spend real money per call. Require auth so a
1457            // drive-by caller can't burn through the provider budget.
1458            if auth_ctx.user_id.is_none() {
1459                let err = json_error(
1460                    "AUTH_REQUIRED",
1461                    "/api/ai/stream requires an authenticated session",
1462                );
1463                let response = with_security_headers(
1464                    Response::from_string(&err)
1465                        .with_status_code(401u16)
1466                        .with_header(
1467                            Header::from_bytes("Content-Type", "application/json").unwrap(),
1468                        )
1469                        .with_header(
1470                            Header::from_bytes(
1471                                "Access-Control-Allow-Origin",
1472                                cors_origin.as_bytes().to_vec(),
1473                            )
1474                            .unwrap(),
1475                        ),
1476                );
1477                let _ = request.respond(response);
1478                mt.record_request("POST", 401);
1479                continue;
1480            }
1481            let ai_provider = std::env::var("PYLON_AI_PROVIDER").unwrap_or_default();
1482            let ai_key = std::env::var("PYLON_AI_API_KEY").unwrap_or_default();
1483            let ai_model = std::env::var("PYLON_AI_MODEL").unwrap_or_default();
1484            let ai_base = std::env::var("PYLON_AI_BASE_URL").unwrap_or_default();
1485
1486            if ai_key.is_empty() && ai_provider != "custom" {
1487                let err = json_error(
1488                    "AI_NOT_CONFIGURED",
1489                    "Set PYLON_AI_PROVIDER and PYLON_AI_API_KEY",
1490                );
1491                let response = with_security_headers(
1492                    Response::from_string(&err)
1493                        .with_status_code(503u16)
1494                        .with_header(
1495                            Header::from_bytes("Content-Type", "application/json").unwrap(),
1496                        )
1497                        .with_header(
1498                            Header::from_bytes(
1499                                "Access-Control-Allow-Origin",
1500                                cors_origin.as_bytes().to_vec(),
1501                            )
1502                            .unwrap(),
1503                        ),
1504                );
1505                let _ = request.respond(response);
1506                mt.record_request("POST", 503);
1507                continue;
1508            }
1509
1510            let parsed: serde_json::Value = match serde_json::from_str(&body) {
1511                Ok(v) => v,
1512                Err(_) => {
1513                    let err = json_error("INVALID_JSON", "Invalid request body");
1514                    let response = with_security_headers(
1515                        Response::from_string(&err)
1516                            .with_status_code(400u16)
1517                            .with_header(
1518                                Header::from_bytes("Content-Type", "application/json").unwrap(),
1519                            )
1520                            .with_header(
1521                                Header::from_bytes(
1522                                    "Access-Control-Allow-Origin",
1523                                    cors_origin.as_bytes().to_vec(),
1524                                )
1525                                .unwrap(),
1526                            ),
1527                    );
1528                    let _ = request.respond(response);
1529                    mt.record_request("POST", 400);
1530                    continue;
1531                }
1532            };
1533
1534            let messages: Vec<AiMessage> = match parsed.get("messages").and_then(|m| m.as_array()) {
1535                Some(arr) => arr
1536                    .iter()
1537                    .filter_map(|m| {
1538                        let role = m.get("role")?.as_str()?.to_string();
1539                        let content = m.get("content")?.as_str()?.to_string();
1540                        Some(AiMessage { role, content })
1541                    })
1542                    .collect(),
1543                None => {
1544                    let err = json_error("MISSING_FIELD", "\"messages\" array is required");
1545                    let response = with_security_headers(
1546                        Response::from_string(&err)
1547                            .with_status_code(400u16)
1548                            .with_header(
1549                                Header::from_bytes("Content-Type", "application/json").unwrap(),
1550                            )
1551                            .with_header(
1552                                Header::from_bytes(
1553                                    "Access-Control-Allow-Origin",
1554                                    cors_origin.as_bytes().to_vec(),
1555                                )
1556                                .unwrap(),
1557                            ),
1558                    );
1559                    let _ = request.respond(response);
1560                    mt.record_request("POST", 400);
1561                    continue;
1562                }
1563            };
1564
1565            // Override model from request body if provided.
1566            let model = parsed
1567                .get("model")
1568                .and_then(|m| m.as_str())
1569                .map(|s| s.to_string())
1570                .unwrap_or(ai_model);
1571
1572            let proxy = match ai_provider.as_str() {
1573                "anthropic" => AiProxyPlugin::anthropic(&ai_key, &model),
1574                "openai" => AiProxyPlugin::openai(&ai_key, &model),
1575                "custom" => AiProxyPlugin::custom_with_model(&ai_base, &ai_key, &model),
1576                _ => AiProxyPlugin::openai(&ai_key, &model),
1577            };
1578
1579            // Set up a channel-based streaming body so tiny_http streams
1580            // data to the client as chunks arrive from the AI provider.
1581            let (tx, rx) = std::sync::mpsc::channel::<Vec<u8>>();
1582            let streaming_body = StreamingBody::new(rx);
1583
1584            // Spawn the provider request on a background thread. Each chunk
1585            // is formatted as an SSE event and pushed through the channel.
1586            std::thread::spawn(move || {
1587                let result = proxy.stream_completion(&messages, &mut |chunk| {
1588                    let sse = format!(
1589                        "data: {}
1590
1591",
1592                        serde_json::json!({
1593                            "choices": [{"index": 0, "delta": {"content": chunk}}]
1594                        })
1595                    );
1596                    let _ = tx.send(sse.into_bytes());
1597                });
1598
1599                // Send a final event indicating completion or error.
1600                match result {
1601                    Ok(_) => {
1602                        let _ = tx.send(
1603                            b"data: [DONE]
1604
1605"
1606                            .to_vec(),
1607                        );
1608                    }
1609                    Err(e) => {
1610                        let err_event = format!(
1611                            "data: {}
1612
1613",
1614                            serde_json::json!({"error": {"message": e, "type": "stream_error"}})
1615                        );
1616                        let _ = tx.send(err_event.into_bytes());
1617                    }
1618                }
1619                // tx is dropped here, which causes StreamingBody::read to return 0 (EOF).
1620            });
1621
1622            let response = with_security_headers(Response::new(
1623                tiny_http::StatusCode(200),
1624                vec![
1625                    Header::from_bytes("Content-Type", "text/event-stream").unwrap(),
1626                    Header::from_bytes("Cache-Control", "no-cache").unwrap(),
1627                    Header::from_bytes("Connection", "keep-alive").unwrap(),
1628                    Header::from_bytes(
1629                        "Access-Control-Allow-Origin",
1630                        cors_origin.as_bytes().to_vec(),
1631                    )
1632                    .unwrap(),
1633                ],
1634                streaming_body,
1635                None, // unknown content length = chunked transfer
1636                None,
1637            ));
1638            let _ = request.respond(response);
1639            mt.record_request("POST", 200);
1640            continue;
1641        }
1642
1643        // Studio route (returns HTML, not JSON).
1644        //
1645        // Privileged admin UI. It renders the full schema and lets the
1646        // operator run mutations against the data browser. In production we
1647        // require an admin token; in dev mode we leave it open so
1648        // `pylon dev` remains friction-free for the single-user case.
1649        //
1650        // Serving a WWW-Authenticate Basic realm isn't useful here because
1651        // admin auth is bearer-token based. Callers get a 401 and should
1652        // retry with `Authorization: Bearer <PYLON_ADMIN_TOKEN>`.
1653        let (status, response_body, content_type, is_studio) = if (url == "/studio"
1654            || url == "/studio/")
1655            && method == Method::Get
1656        {
1657            if !is_dev && !auth_ctx.is_admin {
1658                let body = json_error(
1659                    "AUTH_REQUIRED",
1660                    "/studio requires admin auth in production (set PYLON_ADMIN_TOKEN and pass it as Bearer)",
1661                );
1662                let response = with_security_headers(
1663                    Response::from_string(&body)
1664                        .with_status_code(401u16)
1665                        .with_header(
1666                            Header::from_bytes("Content-Type", "application/json").unwrap(),
1667                        )
1668                        .with_header(
1669                            Header::from_bytes(
1670                                "Access-Control-Allow-Origin",
1671                                cors_origin.as_bytes().to_vec(),
1672                            )
1673                            .unwrap(),
1674                        ),
1675                );
1676                let _ = request.respond(response);
1677                mt.record_request("GET", 401);
1678                continue;
1679            }
1680            // Derive the public base URL from the request's Host header +
1681            // X-Forwarded-Proto (Fly / any HTTPS terminator sets this).
1682            // Hardcoding `http://localhost:{port}` here meant the studio
1683            // HTML served from pylon-crm.fly.dev tried to fetch
1684            // http://localhost:4321/api/* from the browser, which CSP
1685            // rightly blocks.
1686            let host = request
1687                .headers()
1688                .iter()
1689                .find(|h| h.field.equiv("Host"))
1690                .map(|h| h.value.as_str().to_string())
1691                .unwrap_or_else(|| format!("localhost:{port}"));
1692            let scheme = request
1693                .headers()
1694                .iter()
1695                .find(|h| h.field.equiv("X-Forwarded-Proto"))
1696                .map(|h| h.value.as_str().to_string())
1697                .unwrap_or_else(|| "http".to_string());
1698            let base = format!("{scheme}://{host}");
1699            let html = pylon_studio_api::generate_studio_html(rt.manifest(), &base);
1700            (200u16, html, "text/html", true)
1701        } else {
1702            // Run plugin middleware with per-request metadata so rate-limit
1703            // plugins can bucket by peer IP (not just user id) when the
1704            // caller is anonymous.
1705            let meta = pylon_plugin::RequestMeta {
1706                peer_ip: peer_ip.as_str(),
1707            };
1708            if let Err(e) = pr.run_on_request_with_meta(method.as_str(), &url, &auth_ctx, &meta) {
1709                (
1710                    e.status,
1711                    json_error(&e.code, &e.message),
1712                    "application/json",
1713                    false,
1714                )
1715            } else if let Some((s, b)) =
1716                pr.try_handle_route(method.as_str(), &url, &body, &auth_ctx)
1717            {
1718                // Plugin handled the route.
1719                (s, b, "application/json", false)
1720            } else {
1721                let notifier = WsSseNotifier {
1722                    ws: Arc::clone(&wh),
1723                    sse: Arc::clone(&sh),
1724                };
1725                let openapi_gen = RuntimeOpenApiGenerator {
1726                    manifest: rt.manifest(),
1727                };
1728                let file_ops = LocalFileOps::new_default();
1729                let cache_adapter = CacheAdapter(Arc::clone(&ca));
1730                let pubsub_adapter = PubSubAdapter(Arc::clone(&ps));
1731                let email_adapter = EmailAdapter::from_env();
1732                let fn_ops: Option<&dyn pylon_router::FnOps> =
1733                    fn_ops_ref.as_deref().map(|f| f as &dyn pylon_router::FnOps);
1734                let shard_adapter = shards_ref.as_ref().map(|reg| ShardOpsAdapter {
1735                    registry: Arc::clone(reg),
1736                });
1737                let shard_ops: Option<&dyn pylon_router::ShardOps> = shard_adapter
1738                    .as_ref()
1739                    .map(|a| a as &dyn pylon_router::ShardOps);
1740                let plugin_hooks = PluginHooksAdapter(Arc::clone(&pr));
1741                // Snapshot request headers as (name, value) pairs for the
1742                // router to forward into webhook-invoked actions. Header
1743                // names are left as-sent; the router lowercases + merges
1744                // duplicates per RFC 7230 when constructing RequestInfo.
1745                let request_headers: Vec<(String, String)> = request
1746                    .headers()
1747                    .iter()
1748                    .map(|h| (h.field.as_str().to_string(), h.value.as_str().to_string()))
1749                    .collect();
1750                let router_ctx = pylon_router::RouterContext {
1751                    store: rt.as_ref(),
1752                    session_store: &ss,
1753                    magic_codes: &mc,
1754                    oauth_state: &os,
1755                    policy_engine: &pe,
1756                    change_log: &cl,
1757                    notifier: &notifier,
1758                    rooms: rm.as_ref(),
1759                    cache: &cache_adapter,
1760                    pubsub: &pubsub_adapter,
1761                    jobs: jq.as_ref(),
1762                    scheduler: sc.as_ref(),
1763                    workflows: we.as_ref(),
1764                    files: &file_ops,
1765                    openapi: &openapi_gen,
1766                    functions: fn_ops,
1767                    email: &email_adapter,
1768                    shards: shard_ops,
1769                    plugin_hooks: &plugin_hooks,
1770                    auth_ctx: &auth_ctx,
1771                    is_dev,
1772                    request_headers: &request_headers,
1773                };
1774                let http_method = HttpMethod::from_str(method.as_str());
1775                let (s, b, _ct) = pylon_router::route(
1776                    &router_ctx,
1777                    http_method,
1778                    &url,
1779                    &body,
1780                    auth_token.as_deref(),
1781                );
1782                (s, b, "application/json", false)
1783            }
1784        };
1785
1786        let mut response = Response::from_string(&response_body)
1787            .with_status_code(status)
1788            .with_header(Header::from_bytes("Content-Type", content_type).unwrap())
1789            .with_header(
1790                Header::from_bytes(
1791                    "Access-Control-Allow-Origin",
1792                    cors_origin.as_bytes().to_vec(),
1793                )
1794                .unwrap(),
1795            )
1796            .with_header(
1797                Header::from_bytes(
1798                    "Access-Control-Allow-Methods",
1799                    "GET, POST, PATCH, DELETE, OPTIONS",
1800                )
1801                .unwrap(),
1802            )
1803            .with_header(
1804                Header::from_bytes(
1805                    "Access-Control-Allow-Headers",
1806                    "Content-Type, Authorization",
1807                )
1808                .unwrap(),
1809            );
1810
1811        // Add Content-Security-Policy for Studio HTML responses.
1812        //
1813        // Studio talks to the same Rust process over HTTP (same origin)
1814        // AND a sibling WebSocket port (port+1, scheme ws:). CSP's
1815        // `default-src` covers `connect-src` by fallback, so any
1816        // directive we set there must include the WS scheme or the
1817        // browser silently blocks the live-sync connection.
1818        //
1819        // `ws:` + `wss:` cover localhost dev + TLS deploys without
1820        // hard-coding ports. Same-origin `'self'` keeps HTTP fetches
1821        // allowed. Inline + eval stay for the Tailwind/Babel CDN scripts
1822        // the current Studio HTML includes.
1823        if is_studio {
1824            response = response.with_header(
1825                Header::from_bytes(
1826                    "Content-Security-Policy",
1827                    "default-src 'self' 'unsafe-inline' 'unsafe-eval' https://cdn.tailwindcss.com https://unpkg.com ws: wss:",
1828                ).unwrap(),
1829            );
1830        }
1831
1832        let response = with_security_headers(response);
1833
1834        let _ = request.respond(response);
1835        mt.record_request(method.as_str(), status);
1836    }
1837
1838    tracing::warn!("Shutting down gracefully...");
1839
1840    // --- Drain phase ---
1841    // Stop accepting new work, let in-flight finish, close subsystems cleanly.
1842    let drain_timeout = std::time::Duration::from_secs(
1843        std::env::var("PYLON_DRAIN_SECS")
1844            .ok()
1845            .and_then(|s| s.parse().ok())
1846            .unwrap_or(10),
1847    );
1848    let start = Instant::now();
1849
1850    // Stop any running shards so their tick loops exit.
1851    if let Some(reg) = &shard_registry {
1852        for id in reg.ids() {
1853            if let Some(shard) = reg.get(&id) {
1854                shard.stop();
1855            }
1856        }
1857    }
1858
1859    // Let the scheduler finish its current cycle.
1860    let _ = &scheduler; // drop Arc at end of scope
1861
1862    // Wait for outstanding workers to idle, up to drain_timeout.
1863    while start.elapsed() < drain_timeout {
1864        let pending_jobs = job_queue.stats().pending;
1865        if pending_jobs == 0 {
1866            break;
1867        }
1868        std::thread::sleep(std::time::Duration::from_millis(100));
1869    }
1870
1871    let elapsed = start.elapsed();
1872    tracing::warn!(
1873        "Drain complete in {:.1}s (timeout {}s)",
1874        elapsed.as_secs_f32(),
1875        drain_timeout.as_secs()
1876    );
1877    Ok(())
1878}
1879
1880// The route() function has been extracted to the `pylon-router` crate.
1881// See `pylon_router::route()` for the platform-agnostic routing logic.
1882// The server now delegates to it via a `RouterContext`.
1883
1884fn json_error(code: &str, message: &str) -> String {
1885    pylon_router::json_error(code, message)
1886}
1887
1888/// Build the session store. Persists by default for file-backed runtimes —
1889/// sessions live in a sibling `<db>.sessions.db` file next to the app DB
1890/// unless `PYLON_SESSION_DB` overrides the path or
1891/// `PYLON_SESSION_IN_MEMORY=1` opts out. In-memory runtimes (tests)
1892/// get an in-memory session store.
1893///
1894/// This used to be opt-in, which silently broke every app after a server
1895/// restart: tokens in browser localStorage resolved to anonymous, pulls
1896/// came back empty under policy, and mutations 400'd with UNAUTHENTICATED.
1897fn build_session_store(app_db_path: Option<&str>) -> SessionStore {
1898    if std::env::var("PYLON_SESSION_IN_MEMORY")
1899        .map(|v| v == "1" || v == "true")
1900        .unwrap_or(false)
1901    {
1902        return SessionStore::new();
1903    }
1904    let explicit = std::env::var("PYLON_SESSION_DB").ok();
1905    let default_path = app_db_path.map(|p| format!("{p}.sessions.db"));
1906    let path = match explicit.or(default_path) {
1907        Some(p) => p,
1908        None => return SessionStore::new(),
1909    };
1910    match crate::session_backend::SqliteSessionBackend::open(&path) {
1911        Ok(backend) => {
1912            tracing::info!("[pylon] Session persistence enabled: {path}");
1913            SessionStore::with_backend(Box::new(backend))
1914        }
1915        Err(e) => {
1916            tracing::warn!(
1917                "[pylon] could not open session DB {path}: {e}. Falling back to in-memory sessions."
1918            );
1919            SessionStore::new()
1920        }
1921    }
1922}
1923
1924/// Parse a `multipart/form-data` body and return the first file part found.
1925///
1926/// Returns `(filename, content_type, bytes)` on success, `None` if the body
1927/// can't be parsed or no file part exists.
1928///
1929/// Handles the common RFC 7578 subset used by browsers and curl:
1930/// - `--boundary` part separators
1931/// - `Content-Disposition: form-data; name=...; filename=...`
1932/// - `Content-Type: ...`
1933/// - blank line, then raw bytes, then `\r\n--boundary` terminator
1934fn parse_multipart_first_file(
1935    body: &[u8],
1936    content_type_header: &str,
1937) -> Option<(String, String, Vec<u8>)> {
1938    // Extract the boundary parameter.
1939    let boundary_param = content_type_header
1940        .split(';')
1941        .find_map(|p| p.trim().strip_prefix("boundary="))?;
1942    let boundary = boundary_param.trim_matches('"');
1943    let delimiter = format!("--{boundary}");
1944    let delimiter_bytes = delimiter.as_bytes();
1945
1946    // Find each part between delimiters.
1947    let mut pos = 0usize;
1948    while pos < body.len() {
1949        // Find the next delimiter.
1950        let next = find_subslice(&body[pos..], delimiter_bytes)?;
1951        let part_start = pos + next + delimiter_bytes.len();
1952        // Skip CRLF or -- (terminator) after the delimiter.
1953        if part_start + 2 > body.len() {
1954            return None;
1955        }
1956        if &body[part_start..part_start + 2] == b"--" {
1957            return None; // end of parts, no file found
1958        }
1959        let header_start = part_start + skip_crlf(&body[part_start..]);
1960
1961        // Find end-of-headers (blank line).
1962        let header_end_offset = find_subslice(&body[header_start..], b"\r\n\r\n")?;
1963        let headers = &body[header_start..header_start + header_end_offset];
1964        let data_start = header_start + header_end_offset + 4;
1965
1966        // Find the next delimiter — that's where this part's data ends.
1967        let next_delim_offset = find_subslice(&body[data_start..], delimiter_bytes)?;
1968        // Strip the trailing CRLF before the delimiter.
1969        let mut data_end = data_start + next_delim_offset;
1970        if data_end >= 2 && &body[data_end - 2..data_end] == b"\r\n" {
1971            data_end -= 2;
1972        }
1973
1974        // Parse headers we care about.
1975        let headers_str = std::str::from_utf8(headers).ok()?;
1976        let mut filename: Option<String> = None;
1977        let mut part_ct = String::from("application/octet-stream");
1978        let mut has_file = false;
1979        for line in headers_str.split("\r\n") {
1980            let lower = line.to_ascii_lowercase();
1981            if let Some(rest) = lower.strip_prefix("content-disposition:") {
1982                if rest.contains("filename=") {
1983                    has_file = true;
1984                    // Extract filename="xxx"
1985                    if let Some(start) = line.find("filename=\"") {
1986                        let from = start + 10;
1987                        if let Some(end_offset) = line[from..].find('"') {
1988                            filename = Some(line[from..from + end_offset].to_string());
1989                        }
1990                    }
1991                }
1992            } else if let Some(rest) = lower.strip_prefix("content-type:") {
1993                part_ct = rest.trim().to_string();
1994            }
1995        }
1996
1997        if has_file {
1998            let name = filename.unwrap_or_else(|| "upload".into());
1999            return Some((name, part_ct, body[data_start..data_end].to_vec()));
2000        }
2001
2002        pos = data_end;
2003    }
2004    None
2005}
2006
2007fn find_subslice(haystack: &[u8], needle: &[u8]) -> Option<usize> {
2008    if needle.is_empty() || needle.len() > haystack.len() {
2009        return None;
2010    }
2011    haystack.windows(needle.len()).position(|w| w == needle)
2012}
2013
2014fn skip_crlf(buf: &[u8]) -> usize {
2015    if buf.len() >= 2 && &buf[0..2] == b"\r\n" {
2016        2
2017    } else if !buf.is_empty() && buf[0] == b'\n' {
2018        1
2019    } else {
2020        0
2021    }
2022}
2023
2024#[cfg(test)]
2025mod multipart_tests {
2026    use super::*;
2027
2028    #[test]
2029    fn parses_single_file() {
2030        let body = b"--bnd\r\n\
2031Content-Disposition: form-data; name=\"file\"; filename=\"hello.txt\"\r\n\
2032Content-Type: text/plain\r\n\
2033\r\n\
2034Hello world\r\n\
2035--bnd--\r\n";
2036        let ct = "multipart/form-data; boundary=bnd";
2037        let (name, content_type, bytes) = parse_multipart_first_file(body, ct).unwrap();
2038        assert_eq!(name, "hello.txt");
2039        assert_eq!(content_type, "text/plain");
2040        assert_eq!(bytes, b"Hello world");
2041    }
2042
2043    #[test]
2044    fn returns_none_without_file_part() {
2045        let body = b"--bnd\r\n\
2046Content-Disposition: form-data; name=\"field\"\r\n\
2047\r\n\
2048just text\r\n\
2049--bnd--\r\n";
2050        let ct = "multipart/form-data; boundary=bnd";
2051        assert!(parse_multipart_first_file(body, ct).is_none());
2052    }
2053
2054    #[test]
2055    fn returns_none_when_no_boundary() {
2056        assert!(parse_multipart_first_file(b"anything", "application/json").is_none());
2057    }
2058}