1#[allow(unused_imports)]
2use std::io::Read;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5use std::time::Instant;
6
7use pylon_auth::SessionStore;
8use pylon_http::HttpMethod;
9use pylon_plugin::PluginRegistry;
10use pylon_policy::PolicyEngine;
11use pylon_sync::{ChangeKind, ChangeLog};
12use tiny_http::{Header, Method, Response, Server};
13
14use crate::datastore::{
15 CacheAdapter, EmailAdapter, LocalFileOps, PluginHooksAdapter, PubSubAdapter,
16 RuntimeOpenApiGenerator, ShardOpsAdapter, WsSseNotifier,
17};
18use crate::jobs::{JobQueue, JobResult, Worker};
19use crate::metrics::Metrics;
20use crate::pubsub::PubSubBroker;
21use crate::rate_limit::RateLimiter;
22use crate::rooms::RoomManager;
23use crate::scheduler::Scheduler;
24use crate::sse::SseHub;
25use crate::workflows::WorkflowEngine;
26use crate::ws::WsHub;
27use crate::Runtime;
28use pylon_plugin::builtin::ai_proxy::{AiMessage, AiProxyPlugin};
29use pylon_plugin::builtin::cache::CachePlugin;
30
31struct StreamingBody {
41 rx: std::sync::mpsc::Receiver<Vec<u8>>,
42 buf: Vec<u8>,
43 pos: usize,
44}
45
46impl StreamingBody {
47 fn new(rx: std::sync::mpsc::Receiver<Vec<u8>>) -> Self {
48 Self {
49 rx,
50 buf: Vec::new(),
51 pos: 0,
52 }
53 }
54}
55
56impl std::io::Read for StreamingBody {
57 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
58 if self.pos < self.buf.len() {
61 let remaining = &self.buf[self.pos..];
62 let n = remaining.len().min(buf.len());
63 buf[..n].copy_from_slice(&remaining[..n]);
64 self.pos += n;
65 if self.pos >= self.buf.len() {
66 self.buf.clear();
67 self.pos = 0;
68 }
69 return Ok(n);
70 }
71
72 match self.rx.recv() {
74 Ok(data) if data.is_empty() => Ok(0),
75 Ok(data) => {
76 let n = data.len().min(buf.len());
77 buf[..n].copy_from_slice(&data[..n]);
78 if n < data.len() {
79 self.buf = data;
80 self.pos = n;
81 }
82 Ok(n)
83 }
84 Err(_) => Ok(0), }
86 }
87}
88
89static SHUTDOWN: AtomicBool = AtomicBool::new(false);
91
92pub fn request_shutdown() {
98 SHUTDOWN.store(true, Ordering::SeqCst);
99 if let Some(srv) = SERVER_HANDLE.get() {
102 srv.unblock();
103 }
104}
105
106static SERVER_HANDLE: std::sync::OnceLock<Arc<Server>> = std::sync::OnceLock::new();
109
110fn resolve_client_ip(request: &tiny_http::Request, trust_proxy_hops: usize) -> String {
126 let socket_ip = request
127 .remote_addr()
128 .map(|a| a.ip().to_string())
129 .unwrap_or_default();
130 if trust_proxy_hops == 0 {
131 return socket_ip;
132 }
133 let xff = request
136 .headers()
137 .iter()
138 .find(|h| {
139 h.field
140 .as_str()
141 .as_str()
142 .eq_ignore_ascii_case("X-Forwarded-For")
143 })
144 .map(|h| h.value.as_str().to_string());
145 let Some(xff) = xff else {
146 return socket_ip;
147 };
148 let entries: Vec<&str> = xff.split(',').map(str::trim).collect();
153 if entries.len() < trust_proxy_hops {
154 return socket_ip;
159 }
160 let candidate = entries[entries.len() - trust_proxy_hops];
161 if candidate.parse::<std::net::IpAddr>().is_ok() {
164 candidate.to_string()
165 } else {
166 socket_ip
167 }
168}
169
170fn security_headers() -> Vec<Header> {
179 vec![
180 Header::from_bytes("X-Content-Type-Options", "nosniff").unwrap(),
181 Header::from_bytes("X-Frame-Options", "DENY").unwrap(),
182 Header::from_bytes("X-XSS-Protection", "1; mode=block").unwrap(),
183 Header::from_bytes("Referrer-Policy", "strict-origin-when-cross-origin").unwrap(),
187 Header::from_bytes(
191 "Permissions-Policy",
192 "accelerometer=(), camera=(), geolocation=(), gyroscope=(), microphone=(), payment=(), usb=()",
193 )
194 .unwrap(),
195 ]
196}
197
198fn with_security_headers<R: std::io::Read>(response: Response<R>) -> Response<R> {
200 let mut resp = response;
201 for header in security_headers() {
202 resp = resp.with_header(header);
203 }
204 resp
205}
206
207pub fn start(runtime: Arc<Runtime>, port: u16) -> Result<(), String> {
209 start_with_plugins(runtime, port, None)
210}
211
212pub fn start_with_plugins(
214 runtime: Arc<Runtime>,
215 port: u16,
216 plugins: Option<Arc<PluginRegistry>>,
217) -> Result<(), String> {
218 start_server(runtime, port, plugins, None)
219}
220
221pub fn start_with_shards(
224 runtime: Arc<Runtime>,
225 port: u16,
226 plugins: Option<Arc<PluginRegistry>>,
227 shard_registry: Arc<dyn pylon_realtime::DynShardRegistry>,
228) -> Result<(), String> {
229 start_server(runtime, port, plugins, Some(shard_registry))
230}
231
232fn start_server(
233 runtime: Arc<Runtime>,
234 port: u16,
235 plugins: Option<Arc<PluginRegistry>>,
236 shard_registry: Option<Arc<dyn pylon_realtime::DynShardRegistry>>,
237) -> Result<(), String> {
238 pylon_observability::run_tracing_hook();
243
244 let addr = format!("0.0.0.0:{port}");
245 let server = Server::http(&addr).map_err(|e| format!("Failed to start server: {e}"))?;
246 let server = Arc::new(server);
247
248 let _ = SERVER_HANDLE.set(Arc::clone(&server));
250
251 let session_lifetime = runtime.manifest().auth.session.expires_in;
252 let auth_stores = build_auth_stores(runtime.db_path().as_deref(), session_lifetime);
253 let session_store = auth_stores.session_store;
254 let magic_codes = auth_stores.magic_codes;
255 let oauth_state = auth_stores.oauth_state;
256 let account_store = auth_stores.account_store;
257 let api_keys = auth_stores.api_keys;
258 let orgs = auth_stores.orgs;
259 let siwe = auth_stores.siwe;
260 let phone_codes = auth_stores.phone_codes;
261 let passkeys = auth_stores.passkeys;
262 let verification = auth_stores.verification;
263 let policy_engine = Arc::new(PolicyEngine::from_manifest(runtime.manifest()));
264 let change_log = Arc::new(ChangeLog::new());
265
266 for entity in runtime.manifest().entities.iter() {
274 match runtime.list(&entity.name) {
275 Ok(rows) => {
276 for row in rows {
277 if let Some(id) = row.get("id").and_then(|v| v.as_str()) {
278 change_log.append(&entity.name, id, ChangeKind::Insert, Some(row.clone()));
279 }
280 }
281 }
282 Err(_) => {
283 }
285 }
286 }
287 let ws_hub = WsHub::new();
288 let sse_hub = SseHub::new();
289 let is_dev_early = std::env::var("PYLON_DEV_MODE")
302 .map(|v| v == "1" || v == "true")
303 .unwrap_or(true);
304 let plugin_rl_max: u32 = if is_dev_early { 100_000 } else { 100 };
305 let plugin_reg: Arc<PluginRegistry> = plugins.unwrap_or_else(|| {
306 let mut reg = PluginRegistry::new(runtime.manifest().clone());
307 reg.register(Arc::new(
308 pylon_plugin::builtin::rate_limit::RateLimitPlugin::new(
309 plugin_rl_max,
310 std::time::Duration::from_secs(60),
311 ),
312 ));
313 reg.register(Arc::new(
318 pylon_plugin::builtin::tenant_scope::TenantScopePlugin::from_manifest(
319 runtime.manifest(),
320 ),
321 ));
322 Arc::new(reg)
323 });
324 let room_mgr = Arc::new(RoomManager::new(120)); let ws_port = port + 1;
326 let sse_port = port + 2;
327
328 let start_time = Instant::now();
330
331 let metrics = Arc::new(Metrics::new());
332
333 let cache = Arc::new(CachePlugin::new(100_000));
335 let pubsub_broker = Arc::new(PubSubBroker::new(100));
336
337 let job_queue = Arc::new(JobQueue::new(1000));
339
340 let jobs_in_memory = std::env::var("PYLON_JOBS_IN_MEMORY")
346 .map(|v| v == "1" || v == "true")
347 .unwrap_or(false);
348 if !jobs_in_memory {
349 let jobs_db_path = std::env::var("PYLON_JOBS_DB").ok().unwrap_or_else(|| {
350 runtime
351 .db_path()
352 .map(|p| format!("{p}.jobs.db"))
353 .unwrap_or_else(|| "pylon.jobs.db".into())
354 });
355 match crate::job_store::JobStore::open(&jobs_db_path) {
356 Ok(store) => {
357 let store = Arc::new(store);
358 let restored = job_queue.restore_from(&store);
359 if restored > 0 {
360 tracing::info!("[jobs] Restored {restored} pending job(s) from {jobs_db_path}");
361 }
362 job_queue.attach_store(store);
363 }
364 Err(e) => {
365 tracing::warn!(
366 "[jobs] Could not open job store at {jobs_db_path}: {e} — running without persistence"
367 );
368 }
369 }
370 }
371
372 {
374 let cache_ref = Arc::clone(&cache);
375 job_queue.register(
376 "pylon.cache.cleanup",
377 Arc::new(move |_job| {
378 cache_ref.cleanup_expired();
379 JobResult::Success
380 }),
381 );
382 let rooms_ref = Arc::clone(&room_mgr);
383 job_queue.register(
384 "pylon.rooms.cleanup",
385 Arc::new(move |_job| {
386 rooms_ref.cleanup_idle();
387 JobResult::Success
388 }),
389 );
390 }
391
392 let scheduler = Arc::new(Scheduler::new(Arc::clone(&job_queue)));
393 let _ = scheduler.schedule(
395 "pylon.cache.cleanup",
396 "*/10 * * * *",
397 Arc::new(|_| JobResult::Success),
398 );
399 let _ = scheduler.schedule(
400 "pylon.rooms.cleanup",
401 "*/5 * * * *",
402 Arc::new(|_| JobResult::Success),
403 );
404
405 let _worker_handles: Vec<_> = (0..2)
407 .map(|i| {
408 let w = Worker::new(Arc::clone(&job_queue), &format!("worker-{i}"));
409 w.start()
410 })
411 .collect();
412
413 let _scheduler_handle = Arc::clone(&scheduler).start();
415
416 let wf_runner_url = std::env::var("PYLON_WORKFLOW_RUNNER_URL")
418 .unwrap_or_else(|_| "http://127.0.0.1:9876/run".to_string());
419 let workflow_engine = Arc::new(WorkflowEngine::new(&wf_runner_url, 10_000));
420
421 let default_rl_max = if is_dev_early { 100_000 } else { 600 };
432 let rl_max: u32 = std::env::var("PYLON_RATE_LIMIT_MAX")
433 .ok()
434 .and_then(|v| v.parse().ok())
435 .unwrap_or(default_rl_max);
436 let rl_window: u64 = std::env::var("PYLON_RATE_LIMIT_WINDOW")
437 .ok()
438 .and_then(|v| v.parse().ok())
439 .unwrap_or(60);
440 let rate_limiter = Arc::new(RateLimiter::new(rl_max, rl_window));
441
442 let fn_rl_max: u32 = std::env::var("PYLON_FN_RATE_LIMIT_MAX")
446 .ok()
447 .and_then(|v| v.parse().ok())
448 .unwrap_or(30);
449 let fn_rl_window: u64 = std::env::var("PYLON_FN_RATE_LIMIT_WINDOW")
450 .ok()
451 .and_then(|v| v.parse().ok())
452 .unwrap_or(60);
453 let fn_rate_limiter = Arc::new(RateLimiter::new(fn_rl_max, fn_rl_window));
454
455 let fn_notifier: Arc<dyn pylon_router::ChangeNotifier> =
463 Arc::new(crate::datastore::WsSseNotifier {
464 ws: Arc::clone(&ws_hub),
465 sse: Arc::clone(&sse_hub),
466 });
467 let fn_ops_maybe = crate::datastore::try_spawn_functions(
468 Arc::clone(&runtime),
469 Arc::clone(&job_queue),
470 Arc::clone(&fn_rate_limiter),
471 Arc::clone(&change_log),
472 fn_notifier,
473 );
474
475 let is_dev = std::env::var("PYLON_DEV_MODE")
483 .map(|v| v == "1" || v == "true")
484 .unwrap_or(false);
485
486 let cors_origin = match std::env::var("PYLON_CORS_ORIGIN") {
493 Ok(v) => v,
494 Err(_) if is_dev => "*".to_string(),
495 Err(_) => {
496 return Err(
497 "PYLON_CORS_ORIGIN must be set in production (non-dev mode). \
498 Set it to your frontend's origin, or set PYLON_DEV_MODE=true \
499 for local development."
500 .into(),
501 );
502 }
503 };
504 if !is_dev && cors_origin == "*" {
505 return Err("PYLON_CORS_ORIGIN=\"*\" is refused in production mode. \
506 Set it to an explicit origin (https://app.example.com)."
507 .into());
508 }
509 let allow_credentials = cors_origin != "*";
516 if Header::from_bytes(
520 "Access-Control-Allow-Origin",
521 cors_origin.as_bytes().to_vec(),
522 )
523 .is_err()
524 {
525 return Err(format!(
526 "PYLON_CORS_ORIGIN={cors_origin:?} contains bytes that are not a valid HTTP header value"
527 ));
528 }
529
530 let admin_token: Option<String> = std::env::var("PYLON_ADMIN_TOKEN").ok();
532
533 let trust_proxy_hops: usize = std::env::var("PYLON_TRUST_PROXY_HOPS")
543 .ok()
544 .and_then(|v| v.parse().ok())
545 .unwrap_or(0);
546
547 let cookie_config = Arc::new({
554 let app_name = runtime.manifest().name.as_str();
555 pylon_auth::CookieConfig::from_env(&pylon_auth::CookieConfig::default_name_for(app_name))
556 });
557
558 let csrf_origins: Vec<String> = match std::env::var("PYLON_CSRF_ORIGINS") {
568 Ok(v) => v
569 .split(',')
570 .map(|s| s.trim().to_string())
571 .filter(|s| !s.is_empty())
572 .collect(),
573 Err(_) => {
574 if is_dev {
575 vec!["*".to_string()]
576 } else if cors_origin != "*" {
577 vec![cors_origin.clone()]
578 } else {
579 vec![]
581 }
582 }
583 };
584 let csrf = Arc::new(pylon_plugin::builtin::csrf::CsrfPlugin::new(csrf_origins));
585
586 let manifest_trusted: Vec<String> = runtime.manifest().auth.trusted_origins.clone();
600 let trusted_origins: Vec<String> = std::env::var("PYLON_TRUSTED_ORIGINS")
601 .map(|v| {
602 v.split(',')
603 .map(|s| s.trim().to_string())
604 .filter(|s| !s.is_empty())
605 .collect()
606 })
607 .unwrap_or_else(|_| {
608 if is_dev_early {
615 vec![
616 "http://localhost:3000".to_string(),
617 "http://localhost:4321".to_string(),
618 "http://localhost:5173".to_string(),
619 "http://127.0.0.1:3000".to_string(),
620 ]
621 } else {
622 Vec::new()
623 }
624 });
625 let mut combined: Vec<String> = trusted_origins;
627 for m in manifest_trusted {
628 if !m.is_empty() && !combined.contains(&m) {
629 combined.push(m);
630 }
631 }
632 let trusted_origins = Arc::new(combined);
633
634 {
648 let hub = Arc::clone(&ws_hub);
649 let sessions = Arc::clone(&session_store);
650 let runtime_for_fetcher = Arc::clone(&runtime);
651 let pe_for_fetcher = Arc::clone(&policy_engine);
652 let fetcher: crate::ws::SnapshotFetcher = Arc::new(move |auth_ctx, entity, row_id| {
653 use pylon_http::DataStore;
654 let row = match runtime_for_fetcher.get_by_id(entity, row_id) {
659 Ok(Some(v)) => v,
660 _ => return None,
661 };
662 if !matches!(
663 pe_for_fetcher.check_entity_read(entity, auth_ctx, Some(&row)),
664 pylon_policy::PolicyResult::Allowed
665 ) {
666 return None;
667 }
668 let snap = match runtime_for_fetcher.crdt_snapshot(entity, row_id) {
669 Ok(Some(bytes)) => bytes,
670 _ => return None,
671 };
672 pylon_router::encode_crdt_frame(
673 pylon_router::CRDT_FRAME_SNAPSHOT,
674 entity,
675 row_id,
676 &snap,
677 )
678 .ok()
679 });
680 std::thread::spawn(move || {
681 crate::ws::start_ws_server(hub, sessions, ws_port, Some(fetcher));
682 });
683 }
684
685 {
687 let hub = Arc::clone(&sse_hub);
688 std::thread::spawn(move || {
689 crate::sse::start_sse_server(hub, sse_port);
690 });
691 }
692
693 let shard_ws_port = port + 3;
695 if let Some(reg) = shard_registry.clone() {
696 let sessions = Arc::clone(&session_store);
697 std::thread::spawn(move || {
698 crate::shard_ws::start_shard_ws_server(reg, sessions, shard_ws_port);
699 });
700 }
701
702 tracing::warn!("pylon dev server listening on http://localhost:{port}");
703 tracing::info!(" WebSocket: ws://localhost:{ws_port}");
704 tracing::info!(" Studio: http://localhost:{port}/studio");
705 tracing::info!(" API: http://localhost:{port}/api/entities/<entity>");
706 tracing::info!(" Auth: http://localhost:{port}/api/auth/session");
707
708 loop {
712 if SHUTDOWN.load(Ordering::Relaxed) {
713 break;
714 }
715
716 let mut request = match server.recv() {
717 Ok(rq) => rq,
718 Err(_) => {
719 break;
721 }
722 };
723
724 if SHUTDOWN.load(Ordering::Relaxed) {
725 break;
726 }
727
728 let rt = Arc::clone(&runtime);
729 let ss = Arc::clone(&session_store);
730 let pe = Arc::clone(&policy_engine);
731 let cl = Arc::clone(&change_log);
732 let wh = Arc::clone(&ws_hub);
733 let sh = Arc::clone(&sse_hub);
734 let mc = Arc::clone(&magic_codes);
735 let pr = Arc::clone(&plugin_reg);
736 let rm = Arc::clone(&room_mgr);
737 let mt = Arc::clone(&metrics);
738 let os = Arc::clone(&oauth_state);
739 let acc = Arc::clone(&account_store);
740 let ak = Arc::clone(&api_keys);
741 let og = Arc::clone(&orgs);
742 let sw = Arc::clone(&siwe);
743 let pcd = Arc::clone(&phone_codes);
744 let pks = Arc::clone(&passkeys);
745 let vrf = Arc::clone(&verification);
746 let trusted_origins_ref = Arc::clone(&trusted_origins);
747 let ca = Arc::clone(&cache);
748 let ps = Arc::clone(&pubsub_broker);
749 let jq = Arc::clone(&job_queue);
750 let sc = Arc::clone(&scheduler);
751 let we = Arc::clone(&workflow_engine);
752 let fn_ops_ref = fn_ops_maybe.clone();
753 let shards_ref = shard_registry.clone();
754 let cors_origin = cors_origin.clone();
755 let cookie_config = Arc::clone(&cookie_config);
756 let allow_credentials = allow_credentials;
757 let is_dev = is_dev;
758
759 let method = request.method().clone();
760 let url = request.url().to_string();
761
762 let request_peer_ip = resolve_client_ip(&request, trust_proxy_hops);
774 let request_started_at = std::time::Instant::now();
775 if url != "/health" && url != "/metrics" {
776 tracing::info!("→ {} {} from {}", method.as_str(), url, request_peer_ip);
777 crate::metrics::set_current_request(&url, request_started_at);
781 }
782
783 if url == "/health" && method == Method::Get {
785 let uptime = start_time.elapsed().as_secs();
786 let body = serde_json::json!({
787 "status": "ok",
788 "version": "0.1.0",
789 "uptime_secs": uptime,
790 })
791 .to_string();
792
793 let response = with_security_headers(
794 Response::from_string(&body)
795 .with_status_code(200u16)
796 .with_header(Header::from_bytes("Content-Type", "application/json").unwrap())
797 .with_header(
798 Header::from_bytes(
799 "Access-Control-Allow-Origin",
800 cors_origin.as_bytes().to_vec(),
801 )
802 .unwrap(),
803 ),
804 );
805 let _ = request.respond(response);
806 continue;
807 }
808
809 if url == "/metrics" && method == Method::Get {
814 if !is_dev {
815 let admin_bytes = admin_token.as_deref().unwrap_or("").as_bytes();
816 let auth_ok = !admin_bytes.is_empty()
817 && request.headers().iter().any(|h| {
818 let name = h.field.as_str().as_str();
819 name.eq_ignore_ascii_case("Authorization")
820 && h.value
821 .as_str()
822 .strip_prefix("Bearer ")
823 .map(|t| pylon_auth::constant_time_eq(t.as_bytes(), admin_bytes))
824 .unwrap_or(false)
825 });
826 if !auth_ok {
827 let body = json_error(
828 "UNAUTHORIZED",
829 "/metrics requires admin bearer token in non-dev mode",
830 );
831 let response = with_security_headers(
832 Response::from_string(&body)
833 .with_status_code(401u16)
834 .with_header(
835 Header::from_bytes("Content-Type", "application/json").unwrap(),
836 ),
837 );
838 let _ = request.respond(response);
839 continue;
840 }
841 }
842 let prefers_prometheus = request.headers().iter().any(|h| {
843 (h.field.as_str() == "Accept" || h.field.as_str() == "accept")
844 && (h.value.as_str().contains("text/plain")
845 || h.value.as_str().contains("application/openmetrics-text"))
846 });
847 let (body, content_type) = if prefers_prometheus {
848 (mt.prometheus(), "text/plain; version=0.0.4")
849 } else {
850 (mt.snapshot().to_string(), "application/json")
851 };
852 let response = with_security_headers(
853 Response::from_string(&body)
854 .with_status_code(200u16)
855 .with_header(Header::from_bytes("Content-Type", content_type).unwrap())
856 .with_header(
857 Header::from_bytes(
858 "Access-Control-Allow-Origin",
859 cors_origin.as_bytes().to_vec(),
860 )
861 .unwrap(),
862 ),
863 );
864 let _ = request.respond(response);
865 mt.record_request("GET", 200);
866 continue;
867 }
868
869 let peer_ip = resolve_client_ip(&request, trust_proxy_hops);
875
876 let is_preflight = matches!(method, Method::Options);
882 if !is_preflight {
883 if let Err(retry_after) = rate_limiter.check(&peer_ip) {
884 let err_body = json_error(
885 "RATE_LIMITED",
886 &format!("Too many requests. Retry after {retry_after} seconds."),
887 );
888 let response = with_security_headers(
889 Response::from_string(&err_body)
890 .with_status_code(429u16)
891 .with_header(
892 Header::from_bytes("Content-Type", "application/json").unwrap(),
893 )
894 .with_header(
895 Header::from_bytes(
896 "Access-Control-Allow-Origin",
897 cors_origin.as_bytes().to_vec(),
898 )
899 .unwrap(),
900 )
901 .with_header(
902 Header::from_bytes(
903 "Access-Control-Allow-Methods",
904 "GET, POST, PATCH, DELETE, OPTIONS",
905 )
906 .unwrap(),
907 )
908 .with_header(
909 Header::from_bytes(
910 "Access-Control-Allow-Headers",
911 "Content-Type, Authorization",
912 )
913 .unwrap(),
914 )
915 .with_header(
916 Header::from_bytes(
917 "Retry-After",
918 retry_after.to_string().as_bytes().to_vec(),
919 )
920 .unwrap(),
921 ),
922 );
923 let _ = request.respond(response);
924 mt.record_request(method.as_str(), 429);
925 continue;
926 }
927 } {
943 let method_str = method.as_str();
944 let is_bearer = request.headers().iter().any(|h| {
945 (h.field.as_str() == "Authorization" || h.field.as_str() == "authorization")
946 && h.value.as_str().starts_with("Bearer ")
947 });
948 if !is_bearer && !matches!(method, Method::Get | Method::Head | Method::Options) {
953 let origin = request
954 .headers()
955 .iter()
956 .find(|h| h.field.as_str() == "Origin" || h.field.as_str() == "origin")
957 .map(|h| h.value.as_str().to_string());
958 let referer = request
959 .headers()
960 .iter()
961 .find(|h| h.field.as_str() == "Referer" || h.field.as_str() == "referer")
962 .map(|h| h.value.as_str().to_string());
963 if let Err(err) = csrf.check(method_str, origin.as_deref(), referer.as_deref()) {
964 let body = json_error(&err.code, &err.message);
965 let response = with_security_headers(
966 Response::from_string(&body)
967 .with_status_code(err.status)
968 .with_header(
969 Header::from_bytes("Content-Type", "application/json").unwrap(),
970 )
971 .with_header(
972 Header::from_bytes(
973 "Access-Control-Allow-Origin",
974 cors_origin.as_bytes().to_vec(),
975 )
976 .unwrap(),
977 ),
978 );
979 let _ = request.respond(response);
980 mt.record_request(method_str, err.status);
981 continue;
982 }
983 }
984 }
985
986 let bearer_token: Option<String> = request
996 .headers()
997 .iter()
998 .find(|h| h.field.as_str() == "Authorization" || h.field.as_str() == "authorization")
999 .and_then(|h| {
1000 let val = h.value.as_str();
1001 val.strip_prefix("Bearer ").map(|t| t.to_string())
1002 });
1003 let cookie_token: Option<String> = if bearer_token.is_some() {
1004 None
1005 } else {
1006 request
1007 .headers()
1008 .iter()
1009 .find(|h| h.field.as_str() == "Cookie" || h.field.as_str() == "cookie")
1010 .and_then(|h| {
1011 pylon_auth::extract_session_cookie(h.value.as_str(), &cookie_config.name)
1012 })
1013 };
1014 let auth_token: Option<String> = bearer_token.or(cookie_token);
1015 let auth_ctx_result: Result<pylon_auth::AuthContext, &'static str> = if admin_token.is_some()
1024 && auth_token.is_some()
1025 && pylon_auth::constant_time_eq(
1026 auth_token.as_deref().unwrap_or("").as_bytes(),
1027 admin_token.as_deref().unwrap_or("").as_bytes(),
1028 ) {
1029 Ok(pylon_auth::AuthContext::admin())
1030 } else if let Some(t) = auth_token.as_deref() {
1031 if t.starts_with("pk.") {
1032 match ak.verify(t) {
1033 Ok(key) => Ok(pylon_auth::AuthContext::from_api_key(
1034 key.user_id,
1035 key.id,
1036 key.scopes,
1037 )),
1038 Err(_) => Err("INVALID_API_KEY"),
1039 }
1040 } else if pylon_auth::jwt::looks_like_jwt(t) && jwt_secret().is_some() {
1041 let Some(issuer) = jwt_issuer() else {
1047 tracing::warn!(
1048 "[auth] PYLON_JWT_SECRET set but PYLON_JWT_ISSUER missing — \
1049 refusing JWT verify (set both to enable JWT sessions)"
1050 );
1051 Err("JWT_MISCONFIGURED")?;
1052 unreachable!();
1053 };
1054 let secret = jwt_secret().expect("checked above");
1055 match pylon_auth::jwt::verify(t, secret.as_bytes(), Some(issuer)) {
1056 Ok(claims) => {
1057 let mut ctx = pylon_auth::AuthContext::authenticated(claims.sub);
1058 ctx.roles = claims.roles;
1059 if let Some(t) = claims.tenant_id {
1060 ctx = ctx.with_tenant(t);
1061 }
1062 Ok(ctx)
1063 }
1064 Err(_) => Err("INVALID_JWT"),
1065 }
1066 } else {
1067 Ok(ss.resolve(Some(t)))
1068 }
1069 } else {
1070 Ok(ss.resolve(None))
1071 };
1072 let auth_ctx = match auth_ctx_result {
1073 Ok(c) => c,
1074 Err(reason) => {
1075 let body = format!(
1076 r#"{{"error":{{"code":"{reason}","message":"Bearer token is malformed, expired, or revoked"}}}}"#
1077 );
1078 let resp = tiny_http::Response::from_string(body)
1079 .with_status_code(401)
1080 .with_header(
1081 "Content-Type: application/json"
1082 .parse::<tiny_http::Header>()
1083 .unwrap(),
1084 );
1085 let _ = request.respond(resp);
1086 continue;
1087 }
1088 };
1089
1090 if url == "/api/__test__/reset" && method == Method::Post {
1105 let is_loopback = peer_ip == "127.0.0.1"
1106 || peer_ip == "::1"
1107 || peer_ip.starts_with("127.")
1108 || peer_ip == "localhost";
1109 if !is_dev || !rt.is_in_memory() || !is_loopback {
1110 let body = json_error(
1111 "RESET_REFUSED",
1112 "reset endpoint is only available in dev mode + in-memory DB + from loopback",
1113 );
1114 let response = with_security_headers(
1115 Response::from_string(&body)
1116 .with_status_code(403u16)
1117 .with_header(
1118 Header::from_bytes("Content-Type", "application/json").unwrap(),
1119 )
1120 .with_header(
1121 Header::from_bytes(
1122 "Access-Control-Allow-Origin",
1123 cors_origin.as_bytes().to_vec(),
1124 )
1125 .unwrap(),
1126 ),
1127 );
1128 let _ = request.respond(response);
1129 mt.record_request("POST", 403);
1130 continue;
1131 }
1132 let (status, body) = match rt.reset_for_tests() {
1133 Ok(()) => (200u16, "{\"reset\":true}".to_string()),
1134 Err(e) => (500u16, json_error(&e.code, &e.message)),
1135 };
1136 let response = with_security_headers(
1137 Response::from_string(&body)
1138 .with_status_code(status)
1139 .with_header(Header::from_bytes("Content-Type", "application/json").unwrap())
1140 .with_header(
1141 Header::from_bytes(
1142 "Access-Control-Allow-Origin",
1143 cors_origin.as_bytes().to_vec(),
1144 )
1145 .unwrap(),
1146 ),
1147 );
1148 let _ = request.respond(response);
1149 mt.record_request("POST", status);
1150 continue;
1151 }
1152
1153 if url == "/api/files/upload" && method == Method::Post {
1162 const UPLOAD_MAX: usize = 10 * 1024 * 1024;
1163 if let Some(declared) = request.body_length() {
1166 if declared > UPLOAD_MAX {
1167 let err = json_error(
1168 "PAYLOAD_TOO_LARGE",
1169 &format!("Content-Length {declared} exceeds upload max of {UPLOAD_MAX}"),
1170 );
1171 let response = with_security_headers(
1172 Response::from_string(&err)
1173 .with_status_code(413u16)
1174 .with_header(
1175 Header::from_bytes("Content-Type", "application/json").unwrap(),
1176 )
1177 .with_header(
1178 Header::from_bytes(
1179 "Access-Control-Allow-Origin",
1180 cors_origin.as_bytes().to_vec(),
1181 )
1182 .unwrap(),
1183 ),
1184 );
1185 let _ = request.respond(response);
1186 mt.record_request("POST", 413);
1187 continue;
1188 }
1189 }
1190 if auth_ctx.user_id.is_none() {
1191 let err = json_error(
1192 "AUTH_REQUIRED",
1193 "/api/files/upload requires an authenticated session",
1194 );
1195 let response = with_security_headers(
1196 Response::from_string(&err)
1197 .with_status_code(401u16)
1198 .with_header(
1199 Header::from_bytes("Content-Type", "application/json").unwrap(),
1200 )
1201 .with_header(
1202 Header::from_bytes(
1203 "Access-Control-Allow-Origin",
1204 cors_origin.as_bytes().to_vec(),
1205 )
1206 .unwrap(),
1207 ),
1208 );
1209 let _ = request.respond(response);
1210 mt.record_request("POST", 401);
1211 continue;
1212 }
1213 use std::io::Read;
1218 let mut bytes: Vec<u8> = Vec::with_capacity(8192);
1219 let mut limited = request.as_reader().take((UPLOAD_MAX as u64) + 1);
1220 let _ = limited.read_to_end(&mut bytes);
1221
1222 const MAX: usize = UPLOAD_MAX;
1223 if bytes.len() > MAX {
1224 let err = json_error("PAYLOAD_TOO_LARGE", "File exceeds 10 MB limit");
1225 let response = with_security_headers(
1226 Response::from_string(&err)
1227 .with_status_code(413u16)
1228 .with_header(
1229 Header::from_bytes("Content-Type", "application/json").unwrap(),
1230 )
1231 .with_header(
1232 Header::from_bytes(
1233 "Access-Control-Allow-Origin",
1234 cors_origin.as_bytes().to_vec(),
1235 )
1236 .unwrap(),
1237 ),
1238 );
1239 let _ = request.respond(response);
1240 mt.record_request("POST", 413);
1241 continue;
1242 }
1243
1244 let content_type = request
1246 .headers()
1247 .iter()
1248 .find(|h| h.field.as_str() == "Content-Type" || h.field.as_str() == "content-type")
1249 .map(|h| h.value.as_str().to_string())
1250 .unwrap_or_else(|| "application/octet-stream".into());
1251 let filename = request
1252 .headers()
1253 .iter()
1254 .find(|h| h.field.as_str() == "X-Filename" || h.field.as_str() == "x-filename")
1255 .map(|h| h.value.as_str().to_string())
1256 .unwrap_or_else(|| "upload".into());
1257
1258 let (name, ct, payload) = if content_type.starts_with("multipart/form-data") {
1260 match parse_multipart_first_file(&bytes, &content_type) {
1261 Some(p) => p,
1262 None => {
1263 let err = json_error("INVALID_MULTIPART", "Could not parse multipart body");
1264 let response = with_security_headers(
1265 Response::from_string(&err)
1266 .with_status_code(400u16)
1267 .with_header(
1268 Header::from_bytes("Content-Type", "application/json").unwrap(),
1269 )
1270 .with_header(
1271 Header::from_bytes(
1272 "Access-Control-Allow-Origin",
1273 cors_origin.as_bytes().to_vec(),
1274 )
1275 .unwrap(),
1276 ),
1277 );
1278 let _ = request.respond(response);
1279 mt.record_request("POST", 400);
1280 continue;
1281 }
1282 }
1283 } else {
1284 (filename, content_type, bytes)
1285 };
1286
1287 let storage = pylon_storage::files::LocalFileStorage::new(
1288 &std::env::var("PYLON_FILES_DIR").unwrap_or_else(|_| "uploads".into()),
1289 &std::env::var("PYLON_FILES_URL_PREFIX").unwrap_or_else(|_| "/api/files".into()),
1290 );
1291
1292 let (status, body) =
1293 match pylon_storage::files::FileStorage::store(&storage, &name, &payload, &ct) {
1294 Ok(stored) => (
1295 201u16,
1296 serde_json::to_string(&stored).unwrap_or_else(|_| "{}".into()),
1297 ),
1298 Err(e) => (500u16, json_error(&e.code, &e.message)),
1299 };
1300
1301 let response = with_security_headers(
1302 Response::from_string(&body)
1303 .with_status_code(status)
1304 .with_header(Header::from_bytes("Content-Type", "application/json").unwrap())
1305 .with_header(
1306 Header::from_bytes(
1307 "Access-Control-Allow-Origin",
1308 cors_origin.as_bytes().to_vec(),
1309 )
1310 .unwrap(),
1311 ),
1312 );
1313 let _ = request.respond(response);
1314 mt.record_request("POST", status);
1315 continue;
1316 }
1317
1318 const MAX_BODY_SIZE: usize = 10 * 1024 * 1024;
1328
1329 if let Some(declared) = request.body_length() {
1330 if declared > MAX_BODY_SIZE {
1331 let err_body = json_error(
1332 "PAYLOAD_TOO_LARGE",
1333 &format!("Content-Length {declared} exceeds max of {MAX_BODY_SIZE}"),
1334 );
1335 let response = with_security_headers(
1336 Response::from_string(&err_body)
1337 .with_status_code(413u16)
1338 .with_header(
1339 Header::from_bytes(
1340 "Access-Control-Allow-Origin",
1341 cors_origin.as_bytes().to_vec(),
1342 )
1343 .unwrap(),
1344 ),
1345 );
1346 let _ = request.respond(response);
1347 mt.record_request(method.as_str(), 413);
1348 continue;
1349 }
1350 }
1351
1352 let mut body = String::new();
1353 if !matches!(
1354 method,
1355 Method::Get | Method::Head | Method::Options | Method::Delete
1356 ) {
1357 use std::io::Read;
1358 let mut limited = request.as_reader().take((MAX_BODY_SIZE as u64) + 1);
1359 let _ = limited.read_to_string(&mut body);
1360 }
1361
1362 if body.len() > MAX_BODY_SIZE {
1363 let err_body = json_error(
1364 "PAYLOAD_TOO_LARGE",
1365 &format!(
1366 "Request body exceeds maximum size of {} bytes",
1367 MAX_BODY_SIZE,
1368 ),
1369 );
1370 let response = with_security_headers(
1371 Response::from_string(&err_body)
1372 .with_status_code(413u16)
1373 .with_header(Header::from_bytes("Content-Type", "application/json").unwrap())
1374 .with_header(
1375 Header::from_bytes(
1376 "Access-Control-Allow-Origin",
1377 cors_origin.as_bytes().to_vec(),
1378 )
1379 .unwrap(),
1380 ),
1381 );
1382 let _ = request.respond(response);
1383 mt.record_request(method.as_str(), 413);
1384 continue;
1385 }
1386
1387 if method == Method::Get {
1391 if let Some(rest) = url.strip_prefix("/api/shards/") {
1392 let rest = rest.split('?').next().unwrap_or(rest);
1393 if let Some(shard_id) = rest.strip_suffix("/connect") {
1394 if auth_ctx.user_id.is_none() {
1399 let err = json_error(
1400 "AUTH_REQUIRED",
1401 "Shard connect requires an authenticated session",
1402 );
1403 let response = with_security_headers(
1404 Response::from_string(&err)
1405 .with_status_code(401u16)
1406 .with_header(
1407 Header::from_bytes("Content-Type", "application/json").unwrap(),
1408 )
1409 .with_header(
1410 Header::from_bytes(
1411 "Access-Control-Allow-Origin",
1412 cors_origin.as_bytes().to_vec(),
1413 )
1414 .unwrap(),
1415 ),
1416 );
1417 let _ = request.respond(response);
1418 mt.record_request("GET", 401);
1419 continue;
1420 }
1421 let shards = match &shards_ref {
1422 Some(s) => Arc::clone(s),
1423 None => {
1424 let err = json_error(
1425 "SHARDS_NOT_AVAILABLE",
1426 "Shard system is not configured",
1427 );
1428 let response = with_security_headers(
1429 Response::from_string(&err)
1430 .with_status_code(503u16)
1431 .with_header(
1432 Header::from_bytes("Content-Type", "application/json")
1433 .unwrap(),
1434 )
1435 .with_header(
1436 Header::from_bytes(
1437 "Access-Control-Allow-Origin",
1438 cors_origin.as_bytes().to_vec(),
1439 )
1440 .unwrap(),
1441 ),
1442 );
1443 let _ = request.respond(response);
1444 mt.record_request("GET", 503);
1445 continue;
1446 }
1447 };
1448 let shard = match shards.get(shard_id) {
1449 Some(s) => s,
1450 None => {
1451 let err = json_error(
1452 "SHARD_NOT_FOUND",
1453 &format!("Shard \"{shard_id}\" not found"),
1454 );
1455 let response = with_security_headers(
1456 Response::from_string(&err)
1457 .with_status_code(404u16)
1458 .with_header(
1459 Header::from_bytes("Content-Type", "application/json")
1460 .unwrap(),
1461 )
1462 .with_header(
1463 Header::from_bytes(
1464 "Access-Control-Allow-Origin",
1465 cors_origin.as_bytes().to_vec(),
1466 )
1467 .unwrap(),
1468 ),
1469 );
1470 let _ = request.respond(response);
1471 mt.record_request("GET", 404);
1472 continue;
1473 }
1474 };
1475
1476 let sub_id = url
1479 .split("sid=")
1480 .nth(1)
1481 .and_then(|s| s.split('&').next())
1482 .map(|s| s.to_string())
1483 .or_else(|| auth_ctx.user_id.clone())
1484 .unwrap_or_else(|| {
1485 format!(
1486 "anon_{}",
1487 std::time::SystemTime::now()
1488 .duration_since(std::time::UNIX_EPOCH)
1489 .unwrap_or_default()
1490 .as_nanos()
1491 )
1492 });
1493 let subscriber_id = pylon_realtime::SubscriberId::new(sub_id);
1494
1495 let (tx, rx) = std::sync::mpsc::channel::<Vec<u8>>();
1496 let streaming_body = StreamingBody::new(rx);
1497
1498 let tx_clone = tx.clone();
1499 let sink: pylon_realtime::SnapshotSink =
1500 Box::new(move |tick: u64, bytes: &[u8]| {
1501 let mut frame = format!("id: {tick}\ndata: ").into_bytes();
1504 frame.extend_from_slice(bytes);
1505 frame.extend_from_slice(b"\n\n");
1506 let _ = tx_clone.send(frame);
1507 });
1508
1509 let shard_auth = pylon_realtime::ShardAuth {
1510 user_id: auth_ctx.user_id.clone(),
1511 is_admin: auth_ctx.is_admin,
1512 };
1513 if let Err(e) = shard.add_subscriber(subscriber_id.clone(), sink, &shard_auth) {
1514 let (status, code) = match &e {
1515 pylon_realtime::ShardError::Unauthorized(_) => (403u16, "UNAUTHORIZED"),
1516 _ => (429u16, "SUBSCRIBE_FAILED"),
1517 };
1518 let err = json_error(code, &e.to_string());
1519 let response = with_security_headers(
1520 Response::from_string(&err)
1521 .with_status_code(status)
1522 .with_header(
1523 Header::from_bytes("Content-Type", "application/json").unwrap(),
1524 )
1525 .with_header(
1526 Header::from_bytes(
1527 "Access-Control-Allow-Origin",
1528 cors_origin.as_bytes().to_vec(),
1529 )
1530 .unwrap(),
1531 ),
1532 );
1533 let _ = request.respond(response);
1534 mt.record_request("GET", status);
1535 continue;
1536 }
1537
1538 {
1541 let shard_cleanup = Arc::clone(&shard);
1542 let sub_id_cleanup = subscriber_id.clone();
1543 let tx_liveness = tx.clone();
1544 std::thread::spawn(move || {
1545 loop {
1548 std::thread::sleep(std::time::Duration::from_secs(30));
1549 if tx_liveness.send(b": heartbeat\n\n".to_vec()).is_err() {
1550 shard_cleanup.remove_subscriber(&sub_id_cleanup);
1551 return;
1552 }
1553 if !shard_cleanup.is_running() {
1554 return;
1555 }
1556 }
1557 });
1558 }
1559
1560 let response = with_security_headers(Response::new(
1561 tiny_http::StatusCode(200),
1562 vec![
1563 Header::from_bytes("Content-Type", "text/event-stream").unwrap(),
1564 Header::from_bytes("Cache-Control", "no-cache").unwrap(),
1565 Header::from_bytes("Connection", "keep-alive").unwrap(),
1566 Header::from_bytes(
1567 "Access-Control-Allow-Origin",
1568 cors_origin.as_bytes().to_vec(),
1569 )
1570 .unwrap(),
1571 ],
1572 streaming_body,
1573 None,
1574 None,
1575 ));
1576 let _ = request.respond(response);
1577 mt.record_request("GET", 200);
1578 continue;
1579 }
1580 }
1581 }
1582
1583 if method == Method::Post
1585 && url.starts_with("/api/fn/")
1586 && url != "/api/fn/traces"
1587 && request.headers().iter().any(|h| {
1588 (h.field.as_str() == "Accept" || h.field.as_str() == "accept")
1589 && h.value.as_str().contains("text/event-stream")
1590 })
1591 {
1592 let fn_name = url
1593 .strip_prefix("/api/fn/")
1594 .unwrap_or("")
1595 .split('?')
1596 .next()
1597 .unwrap_or("")
1598 .to_string();
1599
1600 if let Some(fn_ops) = &fn_ops_maybe {
1601 if pylon_router::FnOps::get_fn(fn_ops.as_ref(), &fn_name).is_none() {
1605 let err = json_error(
1606 "FN_NOT_FOUND",
1607 &format!("Function \"{fn_name}\" is not registered"),
1608 );
1609 let response = with_security_headers(
1610 Response::from_string(&err)
1611 .with_status_code(404u16)
1612 .with_header(
1613 Header::from_bytes("Content-Type", "application/json").unwrap(),
1614 )
1615 .with_header(
1616 Header::from_bytes(
1617 "Access-Control-Allow-Origin",
1618 cors_origin.as_bytes().to_vec(),
1619 )
1620 .unwrap(),
1621 ),
1622 );
1623 let _ = request.respond(response);
1624 mt.record_request("POST", 404);
1625 continue;
1626 }
1627 let identity = auth_ctx.user_id.as_deref().unwrap_or("anon");
1629 if let Err(retry_after) =
1630 pylon_router::FnOps::check_rate_limit(fn_ops.as_ref(), &fn_name, identity)
1631 {
1632 let body = format!(
1633 r#"{{"error":{{"code":"RATE_LIMITED","message":"Function \"{fn_name}\" rate limit exceeded","retry_after_secs":{retry_after}}}}}"#
1634 );
1635 let response = with_security_headers(
1636 Response::from_string(&body)
1637 .with_status_code(429u16)
1638 .with_header(
1639 Header::from_bytes("Content-Type", "application/json").unwrap(),
1640 )
1641 .with_header(
1642 Header::from_bytes(
1643 "Access-Control-Allow-Origin",
1644 cors_origin.as_bytes().to_vec(),
1645 )
1646 .unwrap(),
1647 ),
1648 );
1649 let _ = request.respond(response);
1650 mt.record_request("POST", 429);
1651 continue;
1652 }
1653
1654 let args: serde_json::Value =
1655 serde_json::from_str(&body).unwrap_or(serde_json::json!({}));
1656
1657 let auth = pylon_functions::protocol::AuthInfo {
1658 user_id: auth_ctx.user_id.clone(),
1659 is_admin: auth_ctx.is_admin,
1660 tenant_id: auth_ctx.tenant_id.clone(),
1661 };
1662
1663 let (tx, rx) = std::sync::mpsc::channel::<Vec<u8>>();
1664 let streaming_body = StreamingBody::new(rx);
1665
1666 let fn_ops_cl = Arc::clone(fn_ops);
1667 let tx_stream = tx.clone();
1668 std::thread::spawn(move || {
1669 let tx_cb = tx_stream.clone();
1670 let on_stream: Box<dyn FnMut(&str) + Send> = Box::new(move |chunk: &str| {
1671 let sse = format!("data: {}\n\n", chunk);
1672 let _ = tx_cb.send(sse.into_bytes());
1673 });
1674
1675 let result = pylon_router::FnOps::call(
1676 fn_ops_cl.as_ref(),
1677 &fn_name,
1678 args,
1679 auth,
1680 Some(on_stream),
1681 None, );
1683 match result {
1684 Ok((value, _trace)) => {
1685 let done = format!(
1686 "event: result\ndata: {}\n\n",
1687 serde_json::to_string(&value).unwrap_or_else(|_| "null".into())
1688 );
1689 let _ = tx_stream.send(done.into_bytes());
1690 }
1691 Err(e) => {
1692 let err = format!(
1693 "event: error\ndata: {}\n\n",
1694 serde_json::json!({"code": e.code, "message": e.message})
1695 );
1696 let _ = tx_stream.send(err.into_bytes());
1697 }
1698 }
1699 });
1700
1701 let response = with_security_headers(Response::new(
1702 tiny_http::StatusCode(200),
1703 vec![
1704 Header::from_bytes("Content-Type", "text/event-stream").unwrap(),
1705 Header::from_bytes("Cache-Control", "no-cache").unwrap(),
1706 Header::from_bytes("Connection", "keep-alive").unwrap(),
1707 Header::from_bytes(
1708 "Access-Control-Allow-Origin",
1709 cors_origin.as_bytes().to_vec(),
1710 )
1711 .unwrap(),
1712 ],
1713 streaming_body,
1714 None,
1715 None,
1716 ));
1717 let _ = request.respond(response);
1718 mt.record_request("POST", 200);
1719 continue;
1720 }
1721 }
1722
1723 if url == "/api/ai/stream" && method == Method::Post {
1725 if auth_ctx.user_id.is_none() {
1728 let err = json_error(
1729 "AUTH_REQUIRED",
1730 "/api/ai/stream requires an authenticated session",
1731 );
1732 let response = with_security_headers(
1733 Response::from_string(&err)
1734 .with_status_code(401u16)
1735 .with_header(
1736 Header::from_bytes("Content-Type", "application/json").unwrap(),
1737 )
1738 .with_header(
1739 Header::from_bytes(
1740 "Access-Control-Allow-Origin",
1741 cors_origin.as_bytes().to_vec(),
1742 )
1743 .unwrap(),
1744 ),
1745 );
1746 let _ = request.respond(response);
1747 mt.record_request("POST", 401);
1748 continue;
1749 }
1750 let ai_provider = std::env::var("PYLON_AI_PROVIDER").unwrap_or_default();
1751 let ai_key = std::env::var("PYLON_AI_API_KEY").unwrap_or_default();
1752 let ai_model = std::env::var("PYLON_AI_MODEL").unwrap_or_default();
1753 let ai_base = std::env::var("PYLON_AI_BASE_URL").unwrap_or_default();
1754
1755 if ai_key.is_empty() && ai_provider != "custom" {
1756 let err = json_error(
1757 "AI_NOT_CONFIGURED",
1758 "Set PYLON_AI_PROVIDER and PYLON_AI_API_KEY",
1759 );
1760 let response = with_security_headers(
1761 Response::from_string(&err)
1762 .with_status_code(503u16)
1763 .with_header(
1764 Header::from_bytes("Content-Type", "application/json").unwrap(),
1765 )
1766 .with_header(
1767 Header::from_bytes(
1768 "Access-Control-Allow-Origin",
1769 cors_origin.as_bytes().to_vec(),
1770 )
1771 .unwrap(),
1772 ),
1773 );
1774 let _ = request.respond(response);
1775 mt.record_request("POST", 503);
1776 continue;
1777 }
1778
1779 let parsed: serde_json::Value = match serde_json::from_str(&body) {
1780 Ok(v) => v,
1781 Err(_) => {
1782 let err = json_error("INVALID_JSON", "Invalid request body");
1783 let response = with_security_headers(
1784 Response::from_string(&err)
1785 .with_status_code(400u16)
1786 .with_header(
1787 Header::from_bytes("Content-Type", "application/json").unwrap(),
1788 )
1789 .with_header(
1790 Header::from_bytes(
1791 "Access-Control-Allow-Origin",
1792 cors_origin.as_bytes().to_vec(),
1793 )
1794 .unwrap(),
1795 ),
1796 );
1797 let _ = request.respond(response);
1798 mt.record_request("POST", 400);
1799 continue;
1800 }
1801 };
1802
1803 let messages: Vec<AiMessage> = match parsed.get("messages").and_then(|m| m.as_array()) {
1804 Some(arr) => arr
1805 .iter()
1806 .filter_map(|m| {
1807 let role = m.get("role")?.as_str()?.to_string();
1808 let content = m.get("content")?.as_str()?.to_string();
1809 Some(AiMessage { role, content })
1810 })
1811 .collect(),
1812 None => {
1813 let err = json_error("MISSING_FIELD", "\"messages\" array is required");
1814 let response = with_security_headers(
1815 Response::from_string(&err)
1816 .with_status_code(400u16)
1817 .with_header(
1818 Header::from_bytes("Content-Type", "application/json").unwrap(),
1819 )
1820 .with_header(
1821 Header::from_bytes(
1822 "Access-Control-Allow-Origin",
1823 cors_origin.as_bytes().to_vec(),
1824 )
1825 .unwrap(),
1826 ),
1827 );
1828 let _ = request.respond(response);
1829 mt.record_request("POST", 400);
1830 continue;
1831 }
1832 };
1833
1834 let model = parsed
1836 .get("model")
1837 .and_then(|m| m.as_str())
1838 .map(|s| s.to_string())
1839 .unwrap_or(ai_model);
1840
1841 let proxy = match ai_provider.as_str() {
1842 "anthropic" => AiProxyPlugin::anthropic(&ai_key, &model),
1843 "openai" => AiProxyPlugin::openai(&ai_key, &model),
1844 "custom" => AiProxyPlugin::custom_with_model(&ai_base, &ai_key, &model),
1845 _ => AiProxyPlugin::openai(&ai_key, &model),
1846 };
1847
1848 let (tx, rx) = std::sync::mpsc::channel::<Vec<u8>>();
1851 let streaming_body = StreamingBody::new(rx);
1852
1853 std::thread::spawn(move || {
1856 let result = proxy.stream_completion(&messages, &mut |chunk| {
1857 let sse = format!(
1858 "data: {}
1859
1860",
1861 serde_json::json!({
1862 "choices": [{"index": 0, "delta": {"content": chunk}}]
1863 })
1864 );
1865 let _ = tx.send(sse.into_bytes());
1866 });
1867
1868 match result {
1870 Ok(_) => {
1871 let _ = tx.send(
1872 b"data: [DONE]
1873
1874"
1875 .to_vec(),
1876 );
1877 }
1878 Err(e) => {
1879 let err_event = format!(
1880 "data: {}
1881
1882",
1883 serde_json::json!({"error": {"message": e, "type": "stream_error"}})
1884 );
1885 let _ = tx.send(err_event.into_bytes());
1886 }
1887 }
1888 });
1890
1891 let response = with_security_headers(Response::new(
1892 tiny_http::StatusCode(200),
1893 vec![
1894 Header::from_bytes("Content-Type", "text/event-stream").unwrap(),
1895 Header::from_bytes("Cache-Control", "no-cache").unwrap(),
1896 Header::from_bytes("Connection", "keep-alive").unwrap(),
1897 Header::from_bytes(
1898 "Access-Control-Allow-Origin",
1899 cors_origin.as_bytes().to_vec(),
1900 )
1901 .unwrap(),
1902 ],
1903 streaming_body,
1904 None, None,
1906 ));
1907 let _ = request.respond(response);
1908 mt.record_request("POST", 200);
1909 continue;
1910 }
1911
1912 let (status, response_body, content_type, is_studio, extra_headers) = if (url == "/studio"
1923 || url == "/studio/")
1924 && method == Method::Get
1925 {
1926 if !is_dev && !auth_ctx.is_admin {
1927 let body = json_error(
1928 "AUTH_REQUIRED",
1929 "/studio requires admin auth in production (set PYLON_ADMIN_TOKEN and pass it as Bearer)",
1930 );
1931 let response = with_security_headers(
1932 Response::from_string(&body)
1933 .with_status_code(401u16)
1934 .with_header(
1935 Header::from_bytes("Content-Type", "application/json").unwrap(),
1936 )
1937 .with_header(
1938 Header::from_bytes(
1939 "Access-Control-Allow-Origin",
1940 cors_origin.as_bytes().to_vec(),
1941 )
1942 .unwrap(),
1943 ),
1944 );
1945 let _ = request.respond(response);
1946 mt.record_request("GET", 401);
1947 continue;
1948 }
1949 let host = request
1956 .headers()
1957 .iter()
1958 .find(|h| h.field.equiv("Host"))
1959 .map(|h| h.value.as_str().to_string())
1960 .unwrap_or_else(|| format!("localhost:{port}"));
1961 let scheme = request
1962 .headers()
1963 .iter()
1964 .find(|h| h.field.equiv("X-Forwarded-Proto"))
1965 .map(|h| h.value.as_str().to_string())
1966 .unwrap_or_else(|| "http".to_string());
1967 let base = format!("{scheme}://{host}");
1968 let html = pylon_studio_api::generate_studio_html(rt.manifest(), &base);
1969 (
1970 200u16,
1971 html,
1972 "text/html",
1973 true,
1974 Vec::<(String, String)>::new(),
1975 )
1976 } else {
1977 let meta = pylon_plugin::RequestMeta {
1981 peer_ip: peer_ip.as_str(),
1982 };
1983 if let Err(e) = pr.run_on_request_with_meta(method.as_str(), &url, &auth_ctx, &meta) {
1984 (
1985 e.status,
1986 json_error(&e.code, &e.message),
1987 "application/json",
1988 false,
1989 Vec::new(),
1990 )
1991 } else if let Some((s, b)) =
1992 pr.try_handle_route(method.as_str(), &url, &body, &auth_ctx)
1993 {
1994 (s, b, "application/json", false, Vec::new())
1996 } else {
1997 let notifier = WsSseNotifier {
1998 ws: Arc::clone(&wh),
1999 sse: Arc::clone(&sh),
2000 };
2001 let openapi_gen = RuntimeOpenApiGenerator {
2002 manifest: rt.manifest(),
2003 };
2004 let file_ops = LocalFileOps::new_default();
2005 let cache_adapter = CacheAdapter(Arc::clone(&ca));
2006 let pubsub_adapter = PubSubAdapter(Arc::clone(&ps));
2007 let email_adapter = EmailAdapter::from_env();
2008 let fn_ops: Option<&dyn pylon_router::FnOps> =
2009 fn_ops_ref.as_deref().map(|f| f as &dyn pylon_router::FnOps);
2010 let shard_adapter = shards_ref.as_ref().map(|reg| ShardOpsAdapter {
2011 registry: Arc::clone(reg),
2012 });
2013 let shard_ops: Option<&dyn pylon_router::ShardOps> = shard_adapter
2014 .as_ref()
2015 .map(|a| a as &dyn pylon_router::ShardOps);
2016 let plugin_hooks = PluginHooksAdapter(Arc::clone(&pr));
2017 let request_headers: Vec<(String, String)> = request
2022 .headers()
2023 .iter()
2024 .map(|h| (h.field.as_str().to_string(), h.value.as_str().to_string()))
2025 .collect();
2026 let router_ctx = pylon_router::RouterContext {
2027 store: rt.as_ref(),
2028 session_store: &ss,
2029 magic_codes: &mc,
2030 oauth_state: &os,
2031 account_store: &acc,
2032 api_keys: &ak,
2033 orgs: &og,
2034 siwe: &sw,
2035 phone_codes: &pcd,
2036 passkeys: &pks,
2037 verification: &vrf,
2038 policy_engine: &pe,
2039 change_log: &cl,
2040 notifier: ¬ifier,
2041 rooms: rm.as_ref(),
2042 cache: &cache_adapter,
2043 pubsub: &pubsub_adapter,
2044 jobs: jq.as_ref(),
2045 scheduler: sc.as_ref(),
2046 workflows: we.as_ref(),
2047 files: &file_ops,
2048 openapi: &openapi_gen,
2049 functions: fn_ops,
2050 email: &email_adapter,
2051 shards: shard_ops,
2052 plugin_hooks: &plugin_hooks,
2053 auth_ctx: &auth_ctx,
2054 trusted_origins: &trusted_origins_ref,
2055 is_dev,
2056 request_headers: &request_headers,
2057 peer_ip: peer_ip.as_str(),
2058 cookie_config: cookie_config.as_ref(),
2059 response_headers: std::cell::RefCell::new(Vec::new()),
2060 };
2061 let http_method = HttpMethod::from_str(method.as_str());
2062 let (s, b, _ct) = pylon_router::route(
2063 &router_ctx,
2064 http_method,
2065 &url,
2066 &body,
2067 auth_token.as_deref(),
2068 );
2069 let extra_headers = router_ctx.take_response_headers();
2070 (s, b, "application/json", false, extra_headers)
2071 }
2072 };
2073
2074 let mut response = Response::from_string(&response_body)
2075 .with_status_code(status)
2076 .with_header(Header::from_bytes("Content-Type", content_type).unwrap())
2077 .with_header(
2078 Header::from_bytes(
2079 "Access-Control-Allow-Origin",
2080 cors_origin.as_bytes().to_vec(),
2081 )
2082 .unwrap(),
2083 )
2084 .with_header(
2085 Header::from_bytes(
2086 "Access-Control-Allow-Methods",
2087 "GET, POST, PATCH, DELETE, OPTIONS",
2088 )
2089 .unwrap(),
2090 )
2091 .with_header(
2092 Header::from_bytes(
2093 "Access-Control-Allow-Headers",
2094 "Content-Type, Authorization",
2095 )
2096 .unwrap(),
2097 );
2098 if allow_credentials {
2103 response = response
2104 .with_header(
2105 Header::from_bytes("Access-Control-Allow-Credentials", "true").unwrap(),
2106 )
2107 .with_header(Header::from_bytes("Vary", "Origin").unwrap());
2108 }
2109
2110 for (name, value) in extra_headers {
2117 if let Ok(h) = Header::from_bytes(name.as_bytes(), value.as_bytes().to_vec()) {
2118 response = response.with_header(h);
2119 }
2120 }
2121
2122 if is_studio {
2135 response = response.with_header(
2136 Header::from_bytes(
2137 "Content-Security-Policy",
2138 "default-src 'self' 'unsafe-inline' 'unsafe-eval' https://cdn.tailwindcss.com https://unpkg.com ws: wss:",
2139 ).unwrap(),
2140 );
2141 }
2142
2143 let response = with_security_headers(response);
2144
2145 let _ = request.respond(response);
2146 mt.record_request(method.as_str(), status);
2147 }
2148
2149 tracing::warn!("Shutting down gracefully...");
2150
2151 let drain_timeout = std::time::Duration::from_secs(
2154 std::env::var("PYLON_DRAIN_SECS")
2155 .ok()
2156 .and_then(|s| s.parse().ok())
2157 .unwrap_or(10),
2158 );
2159 let start = Instant::now();
2160
2161 if let Some(reg) = &shard_registry {
2163 for id in reg.ids() {
2164 if let Some(shard) = reg.get(&id) {
2165 shard.stop();
2166 }
2167 }
2168 }
2169
2170 let _ = &scheduler; while start.elapsed() < drain_timeout {
2175 let pending_jobs = job_queue.stats().pending;
2176 if pending_jobs == 0 {
2177 break;
2178 }
2179 std::thread::sleep(std::time::Duration::from_millis(100));
2180 }
2181
2182 let elapsed = start.elapsed();
2183 tracing::warn!(
2184 "Drain complete in {:.1}s (timeout {}s)",
2185 elapsed.as_secs_f32(),
2186 drain_timeout.as_secs()
2187 );
2188 Ok(())
2189}
2190
2191fn json_error(code: &str, message: &str) -> String {
2196 pylon_router::json_error(code, message)
2197}
2198
2199struct AuthStores {
2209 session_store: Arc<SessionStore>,
2210 magic_codes: Arc<pylon_auth::MagicCodeStore>,
2211 oauth_state: Arc<pylon_auth::OAuthStateStore>,
2212 account_store: Arc<pylon_auth::AccountStore>,
2213 api_keys: Arc<pylon_auth::api_key::ApiKeyStore>,
2214 orgs: Arc<pylon_auth::org::OrgStore>,
2215 siwe: Arc<pylon_auth::siwe::NonceStore>,
2216 phone_codes: Arc<pylon_auth::phone::PhoneCodeStore>,
2217 passkeys: Arc<pylon_auth::webauthn::PasskeyStore>,
2218 verification: Arc<pylon_auth::verification::VerificationStore>,
2219}
2220
2221fn jwt_secret() -> Option<&'static String> {
2226 static CELL: std::sync::OnceLock<Option<String>> = std::sync::OnceLock::new();
2227 CELL.get_or_init(|| std::env::var("PYLON_JWT_SECRET").ok().filter(|s| !s.is_empty()))
2228 .as_ref()
2229}
2230
2231fn jwt_issuer() -> Option<&'static String> {
2232 static CELL: std::sync::OnceLock<Option<String>> = std::sync::OnceLock::new();
2233 CELL.get_or_init(|| std::env::var("PYLON_JWT_ISSUER").ok().filter(|s| !s.is_empty()))
2234 .as_ref()
2235}
2236
2237fn build_auth_stores(app_db_path: Option<&str>, session_lifetime: u64) -> AuthStores {
2238 let force_in_memory = std::env::var("PYLON_SESSION_IN_MEMORY")
2241 .map(|v| v == "1" || v == "true")
2242 .unwrap_or(false);
2243
2244 let pg_url = std::env::var("DATABASE_URL")
2247 .ok()
2248 .filter(|u| u.starts_with("postgres://") || u.starts_with("postgresql://"));
2249
2250 if let Some(url) = pg_url {
2251 if force_in_memory {
2252 return in_memory_auth_stores(session_lifetime);
2255 }
2256 return build_pg_auth_stores(&url, session_lifetime);
2257 }
2258
2259 let sqlite_path = std::env::var("PYLON_SESSION_DB")
2260 .ok()
2261 .or_else(|| app_db_path.map(|p| format!("{p}.sessions.db")));
2262
2263 match (force_in_memory, sqlite_path) {
2264 (true, _) | (_, None) => in_memory_auth_stores(session_lifetime),
2265 (false, Some(path)) => build_sqlite_auth_stores(&path, session_lifetime),
2266 }
2267}
2268
2269fn in_memory_auth_stores(session_lifetime: u64) -> AuthStores {
2270 AuthStores {
2271 session_store: Arc::new(SessionStore::new().with_lifetime(session_lifetime)),
2272 magic_codes: Arc::new(pylon_auth::MagicCodeStore::new()),
2273 oauth_state: Arc::new(pylon_auth::OAuthStateStore::new()),
2274 account_store: Arc::new(pylon_auth::AccountStore::new()),
2275 api_keys: Arc::new(pylon_auth::api_key::ApiKeyStore::new()),
2276 orgs: Arc::new(pylon_auth::org::OrgStore::new()),
2277 siwe: Arc::new(pylon_auth::siwe::NonceStore::new()),
2278 phone_codes: Arc::new(pylon_auth::phone::PhoneCodeStore::new()),
2279 passkeys: Arc::new(pylon_auth::webauthn::PasskeyStore::new()),
2280 verification: Arc::new(pylon_auth::verification::VerificationStore::new()),
2281 }
2282}
2283
2284fn build_sqlite_auth_stores(path: &str, session_lifetime: u64) -> AuthStores {
2285 let session_store = match crate::session_backend::SqliteSessionBackend::open(path) {
2286 Ok(b) => {
2287 tracing::info!("[pylon] Auth state (SQLite): {path}");
2288 SessionStore::with_backend(Box::new(b)).with_lifetime(session_lifetime)
2289 }
2290 Err(e) => {
2291 tracing::warn!("[pylon] could not open session DB {path}: {e}. In-memory fallback.");
2292 SessionStore::new().with_lifetime(session_lifetime)
2293 }
2294 };
2295 let magic_codes = match crate::magic_code_backend::SqliteMagicCodeBackend::open(path) {
2296 Ok(b) => pylon_auth::MagicCodeStore::with_backend(Box::new(b)),
2297 Err(e) => {
2298 tracing::warn!("[pylon] magic-code SQLite backend unavailable: {e}");
2299 pylon_auth::MagicCodeStore::new()
2300 }
2301 };
2302 let oauth_state = match crate::oauth_backend::SqliteOAuthBackend::open(path) {
2303 Ok(b) => pylon_auth::OAuthStateStore::with_backend(Box::new(b)),
2304 Err(e) => {
2305 tracing::warn!("[pylon] OAuth state SQLite backend unavailable: {e}");
2306 pylon_auth::OAuthStateStore::new()
2307 }
2308 };
2309 let account_store = match crate::account_backend::SqliteAccountBackend::open(path) {
2310 Ok(b) => pylon_auth::AccountStore::with_backend(Box::new(b)),
2311 Err(e) => {
2312 tracing::warn!("[pylon] account-link SQLite backend unavailable: {e}");
2313 pylon_auth::AccountStore::new()
2314 }
2315 };
2316 let api_keys = match crate::api_key_backend::SqliteApiKeyBackend::open(path) {
2317 Ok(b) => pylon_auth::api_key::ApiKeyStore::with_backend(Box::new(b)),
2318 Err(e) => {
2319 tracing::warn!("[pylon] api-key SQLite backend unavailable: {e}");
2320 pylon_auth::api_key::ApiKeyStore::new()
2321 }
2322 };
2323 let orgs = match crate::org_backend::SqliteOrgBackend::open(path) {
2324 Ok(b) => pylon_auth::org::OrgStore::with_backend(Box::new(b)),
2325 Err(e) => {
2326 tracing::warn!("[pylon] org SQLite backend unavailable: {e}");
2327 pylon_auth::org::OrgStore::new()
2328 }
2329 };
2330 let verification = match crate::verification_backend::SqliteVerificationBackend::open(path) {
2331 Ok(b) => pylon_auth::verification::VerificationStore::with_backend(Box::new(b)),
2332 Err(e) => {
2333 tracing::warn!("[pylon] verification SQLite backend unavailable: {e}");
2334 pylon_auth::verification::VerificationStore::new()
2335 }
2336 };
2337 AuthStores {
2338 session_store: Arc::new(session_store),
2339 magic_codes: Arc::new(magic_codes),
2340 oauth_state: Arc::new(oauth_state),
2341 account_store: Arc::new(account_store),
2342 api_keys: Arc::new(api_keys),
2343 orgs: Arc::new(orgs),
2344 siwe: Arc::new(pylon_auth::siwe::NonceStore::new()),
2345 phone_codes: Arc::new(pylon_auth::phone::PhoneCodeStore::new()),
2346 passkeys: Arc::new(pylon_auth::webauthn::PasskeyStore::new()),
2347 verification: Arc::new(verification),
2348 }
2349}
2350
2351fn build_pg_auth_stores(url: &str, session_lifetime: u64) -> AuthStores {
2352 let session_store = match crate::session_backend::PostgresSessionBackend::connect(url) {
2357 Ok(b) => {
2358 tracing::info!("[pylon] Auth state (Postgres): {url}");
2359 SessionStore::with_backend(Box::new(b)).with_lifetime(session_lifetime)
2360 }
2361 Err(e) => {
2362 tracing::warn!("[pylon] PG session backend unavailable: {e}. In-memory fallback.");
2363 SessionStore::new().with_lifetime(session_lifetime)
2364 }
2365 };
2366 let magic_codes = match crate::magic_code_backend::PostgresMagicCodeBackend::connect(url) {
2367 Ok(b) => pylon_auth::MagicCodeStore::with_backend(Box::new(b)),
2368 Err(e) => {
2369 tracing::warn!("[pylon] PG magic-code backend unavailable: {e}");
2370 pylon_auth::MagicCodeStore::new()
2371 }
2372 };
2373 let oauth_state = match crate::oauth_backend::PostgresOAuthBackend::connect(url) {
2374 Ok(b) => pylon_auth::OAuthStateStore::with_backend(Box::new(b)),
2375 Err(e) => {
2376 tracing::warn!("[pylon] PG OAuth state backend unavailable: {e}");
2377 pylon_auth::OAuthStateStore::new()
2378 }
2379 };
2380 let account_store = match crate::account_backend::PostgresAccountBackend::connect(url) {
2381 Ok(b) => pylon_auth::AccountStore::with_backend(Box::new(b)),
2382 Err(e) => {
2383 tracing::warn!("[pylon] PG account-link backend unavailable: {e}");
2384 pylon_auth::AccountStore::new()
2385 }
2386 };
2387 let api_keys = match crate::api_key_backend::PostgresApiKeyBackend::connect(url) {
2388 Ok(b) => pylon_auth::api_key::ApiKeyStore::with_backend(Box::new(b)),
2389 Err(e) => {
2390 tracing::warn!("[pylon] PG api-key backend unavailable: {e}");
2391 pylon_auth::api_key::ApiKeyStore::new()
2392 }
2393 };
2394 let orgs = match crate::org_backend::PostgresOrgBackend::connect(url) {
2395 Ok(b) => pylon_auth::org::OrgStore::with_backend(Box::new(b)),
2396 Err(e) => {
2397 tracing::warn!("[pylon] PG org backend unavailable: {e}");
2398 pylon_auth::org::OrgStore::new()
2399 }
2400 };
2401 let verification =
2402 match crate::verification_backend::PostgresVerificationBackend::connect(url) {
2403 Ok(b) => pylon_auth::verification::VerificationStore::with_backend(Box::new(b)),
2404 Err(e) => {
2405 tracing::warn!("[pylon] PG verification backend unavailable: {e}");
2406 pylon_auth::verification::VerificationStore::new()
2407 }
2408 };
2409 AuthStores {
2410 session_store: Arc::new(session_store),
2411 magic_codes: Arc::new(magic_codes),
2412 oauth_state: Arc::new(oauth_state),
2413 account_store: Arc::new(account_store),
2414 api_keys: Arc::new(api_keys),
2415 orgs: Arc::new(orgs),
2416 siwe: Arc::new(pylon_auth::siwe::NonceStore::new()),
2417 phone_codes: Arc::new(pylon_auth::phone::PhoneCodeStore::new()),
2418 passkeys: Arc::new(pylon_auth::webauthn::PasskeyStore::new()),
2419 verification: Arc::new(verification),
2420 }
2421}
2422
2423#[allow(dead_code)]
2433fn build_session_store(app_db_path: Option<&str>) -> SessionStore {
2434 if std::env::var("PYLON_SESSION_IN_MEMORY")
2435 .map(|v| v == "1" || v == "true")
2436 .unwrap_or(false)
2437 {
2438 return SessionStore::new();
2439 }
2440 let explicit = std::env::var("PYLON_SESSION_DB").ok();
2441 let default_path = app_db_path.map(|p| format!("{p}.sessions.db"));
2442 let path = match explicit.or(default_path) {
2443 Some(p) => p,
2444 None => return SessionStore::new(),
2445 };
2446 match crate::session_backend::SqliteSessionBackend::open(&path) {
2447 Ok(backend) => {
2448 tracing::info!("[pylon] Session persistence enabled: {path}");
2449 SessionStore::with_backend(Box::new(backend))
2450 }
2451 Err(e) => {
2452 tracing::warn!(
2453 "[pylon] could not open session DB {path}: {e}. Falling back to in-memory sessions."
2454 );
2455 SessionStore::new()
2456 }
2457 }
2458}
2459
2460fn parse_multipart_first_file(
2471 body: &[u8],
2472 content_type_header: &str,
2473) -> Option<(String, String, Vec<u8>)> {
2474 let boundary_param = content_type_header
2476 .split(';')
2477 .find_map(|p| p.trim().strip_prefix("boundary="))?;
2478 let boundary = boundary_param.trim_matches('"');
2479 let delimiter = format!("--{boundary}");
2480 let delimiter_bytes = delimiter.as_bytes();
2481
2482 let mut pos = 0usize;
2484 while pos < body.len() {
2485 let next = find_subslice(&body[pos..], delimiter_bytes)?;
2487 let part_start = pos + next + delimiter_bytes.len();
2488 if part_start + 2 > body.len() {
2490 return None;
2491 }
2492 if &body[part_start..part_start + 2] == b"--" {
2493 return None; }
2495 let header_start = part_start + skip_crlf(&body[part_start..]);
2496
2497 let header_end_offset = find_subslice(&body[header_start..], b"\r\n\r\n")?;
2499 let headers = &body[header_start..header_start + header_end_offset];
2500 let data_start = header_start + header_end_offset + 4;
2501
2502 let next_delim_offset = find_subslice(&body[data_start..], delimiter_bytes)?;
2504 let mut data_end = data_start + next_delim_offset;
2506 if data_end >= 2 && &body[data_end - 2..data_end] == b"\r\n" {
2507 data_end -= 2;
2508 }
2509
2510 let headers_str = std::str::from_utf8(headers).ok()?;
2512 let mut filename: Option<String> = None;
2513 let mut part_ct = String::from("application/octet-stream");
2514 let mut has_file = false;
2515 for line in headers_str.split("\r\n") {
2516 let lower = line.to_ascii_lowercase();
2517 if let Some(rest) = lower.strip_prefix("content-disposition:") {
2518 if rest.contains("filename=") {
2519 has_file = true;
2520 if let Some(start) = line.find("filename=\"") {
2522 let from = start + 10;
2523 if let Some(end_offset) = line[from..].find('"') {
2524 filename = Some(line[from..from + end_offset].to_string());
2525 }
2526 }
2527 }
2528 } else if let Some(rest) = lower.strip_prefix("content-type:") {
2529 part_ct = rest.trim().to_string();
2530 }
2531 }
2532
2533 if has_file {
2534 let name = filename.unwrap_or_else(|| "upload".into());
2535 return Some((name, part_ct, body[data_start..data_end].to_vec()));
2536 }
2537
2538 pos = data_end;
2539 }
2540 None
2541}
2542
2543fn find_subslice(haystack: &[u8], needle: &[u8]) -> Option<usize> {
2544 if needle.is_empty() || needle.len() > haystack.len() {
2545 return None;
2546 }
2547 haystack.windows(needle.len()).position(|w| w == needle)
2548}
2549
2550fn skip_crlf(buf: &[u8]) -> usize {
2551 if buf.len() >= 2 && &buf[0..2] == b"\r\n" {
2552 2
2553 } else if !buf.is_empty() && buf[0] == b'\n' {
2554 1
2555 } else {
2556 0
2557 }
2558}
2559
2560#[cfg(test)]
2561mod multipart_tests {
2562 use super::*;
2563
2564 #[test]
2565 fn parses_single_file() {
2566 let body = b"--bnd\r\n\
2567Content-Disposition: form-data; name=\"file\"; filename=\"hello.txt\"\r\n\
2568Content-Type: text/plain\r\n\
2569\r\n\
2570Hello world\r\n\
2571--bnd--\r\n";
2572 let ct = "multipart/form-data; boundary=bnd";
2573 let (name, content_type, bytes) = parse_multipart_first_file(body, ct).unwrap();
2574 assert_eq!(name, "hello.txt");
2575 assert_eq!(content_type, "text/plain");
2576 assert_eq!(bytes, b"Hello world");
2577 }
2578
2579 #[test]
2580 fn returns_none_without_file_part() {
2581 let body = b"--bnd\r\n\
2582Content-Disposition: form-data; name=\"field\"\r\n\
2583\r\n\
2584just text\r\n\
2585--bnd--\r\n";
2586 let ct = "multipart/form-data; boundary=bnd";
2587 assert!(parse_multipart_first_file(body, ct).is_none());
2588 }
2589
2590 #[test]
2591 fn returns_none_when_no_boundary() {
2592 assert!(parse_multipart_first_file(b"anything", "application/json").is_none());
2593 }
2594}