1#[allow(unused_imports)]
2use std::io::Read;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5use std::time::Instant;
6
7use pylon_auth::SessionStore;
8use pylon_http::HttpMethod;
9use pylon_plugin::PluginRegistry;
10use pylon_policy::PolicyEngine;
11use pylon_sync::{ChangeKind, ChangeLog};
12use tiny_http::{Header, Method, Response, Server};
13
14use crate::datastore::{
15 CacheAdapter, EmailAdapter, LocalFileOps, PluginHooksAdapter, PubSubAdapter,
16 RuntimeOpenApiGenerator, ShardOpsAdapter, WsSseNotifier,
17};
18use crate::jobs::{JobQueue, JobResult, Worker};
19use crate::metrics::Metrics;
20use crate::pubsub::PubSubBroker;
21use crate::rate_limit::RateLimiter;
22use crate::rooms::RoomManager;
23use crate::scheduler::Scheduler;
24use crate::sse::SseHub;
25use crate::workflows::WorkflowEngine;
26use crate::ws::WsHub;
27use crate::Runtime;
28use pylon_plugin::builtin::ai_proxy::{AiMessage, AiProxyPlugin};
29use pylon_plugin::builtin::cache::CachePlugin;
30
31struct StreamingBody {
41 rx: std::sync::mpsc::Receiver<Vec<u8>>,
42 buf: Vec<u8>,
43 pos: usize,
44}
45
46impl StreamingBody {
47 fn new(rx: std::sync::mpsc::Receiver<Vec<u8>>) -> Self {
48 Self {
49 rx,
50 buf: Vec::new(),
51 pos: 0,
52 }
53 }
54}
55
56impl std::io::Read for StreamingBody {
57 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
58 if self.pos < self.buf.len() {
61 let remaining = &self.buf[self.pos..];
62 let n = remaining.len().min(buf.len());
63 buf[..n].copy_from_slice(&remaining[..n]);
64 self.pos += n;
65 if self.pos >= self.buf.len() {
66 self.buf.clear();
67 self.pos = 0;
68 }
69 return Ok(n);
70 }
71
72 match self.rx.recv() {
74 Ok(data) if data.is_empty() => Ok(0),
75 Ok(data) => {
76 let n = data.len().min(buf.len());
77 buf[..n].copy_from_slice(&data[..n]);
78 if n < data.len() {
79 self.buf = data;
80 self.pos = n;
81 }
82 Ok(n)
83 }
84 Err(_) => Ok(0), }
86 }
87}
88
89static SHUTDOWN: AtomicBool = AtomicBool::new(false);
91
92pub fn request_shutdown() {
98 SHUTDOWN.store(true, Ordering::SeqCst);
99 if let Some(srv) = SERVER_HANDLE.get() {
102 srv.unblock();
103 }
104}
105
106static SERVER_HANDLE: std::sync::OnceLock<Arc<Server>> = std::sync::OnceLock::new();
109
110fn resolve_client_ip(request: &tiny_http::Request, trust_proxy_hops: usize) -> String {
126 let socket_ip = request
127 .remote_addr()
128 .map(|a| a.ip().to_string())
129 .unwrap_or_default();
130 if trust_proxy_hops == 0 {
131 return socket_ip;
132 }
133 let xff = request
136 .headers()
137 .iter()
138 .find(|h| {
139 h.field
140 .as_str()
141 .as_str()
142 .eq_ignore_ascii_case("X-Forwarded-For")
143 })
144 .map(|h| h.value.as_str().to_string());
145 let Some(xff) = xff else {
146 return socket_ip;
147 };
148 let entries: Vec<&str> = xff.split(',').map(str::trim).collect();
153 if entries.len() < trust_proxy_hops {
154 return socket_ip;
159 }
160 let candidate = entries[entries.len() - trust_proxy_hops];
161 if candidate.parse::<std::net::IpAddr>().is_ok() {
164 candidate.to_string()
165 } else {
166 socket_ip
167 }
168}
169
170fn security_headers() -> Vec<Header> {
179 vec![
180 Header::from_bytes("X-Content-Type-Options", "nosniff").unwrap(),
181 Header::from_bytes("X-Frame-Options", "DENY").unwrap(),
182 Header::from_bytes("X-XSS-Protection", "1; mode=block").unwrap(),
183 Header::from_bytes("Referrer-Policy", "strict-origin-when-cross-origin").unwrap(),
187 Header::from_bytes(
191 "Permissions-Policy",
192 "accelerometer=(), camera=(), geolocation=(), gyroscope=(), microphone=(), payment=(), usb=()",
193 )
194 .unwrap(),
195 ]
196}
197
198fn with_security_headers<R: std::io::Read>(response: Response<R>) -> Response<R> {
200 let mut resp = response;
201 for header in security_headers() {
202 resp = resp.with_header(header);
203 }
204 resp
205}
206
207pub fn start(runtime: Arc<Runtime>, port: u16) -> Result<(), String> {
209 start_with_plugins(runtime, port, None)
210}
211
212pub fn start_with_plugins(
214 runtime: Arc<Runtime>,
215 port: u16,
216 plugins: Option<Arc<PluginRegistry>>,
217) -> Result<(), String> {
218 start_server(runtime, port, plugins, None)
219}
220
221pub fn start_with_shards(
224 runtime: Arc<Runtime>,
225 port: u16,
226 plugins: Option<Arc<PluginRegistry>>,
227 shard_registry: Arc<dyn pylon_realtime::DynShardRegistry>,
228) -> Result<(), String> {
229 start_server(runtime, port, plugins, Some(shard_registry))
230}
231
232fn start_server(
233 runtime: Arc<Runtime>,
234 port: u16,
235 plugins: Option<Arc<PluginRegistry>>,
236 shard_registry: Option<Arc<dyn pylon_realtime::DynShardRegistry>>,
237) -> Result<(), String> {
238 pylon_observability::run_tracing_hook();
243
244 let addr = format!("0.0.0.0:{port}");
245 let server = Server::http(&addr).map_err(|e| format!("Failed to start server: {e}"))?;
246 let server = Arc::new(server);
247
248 let _ = SERVER_HANDLE.set(Arc::clone(&server));
250
251 let session_lifetime = runtime.manifest().auth.session.expires_in;
252 let auth_stores = build_auth_stores(runtime.db_path().as_deref(), session_lifetime);
253 let session_store = auth_stores.session_store;
254 let magic_codes = auth_stores.magic_codes;
255 let oauth_state = auth_stores.oauth_state;
256 let account_store = auth_stores.account_store;
257 let api_keys = auth_stores.api_keys;
258 let orgs = auth_stores.orgs;
259 let siwe = auth_stores.siwe;
260 let phone_codes = auth_stores.phone_codes;
261 let passkeys = auth_stores.passkeys;
262 let verification = auth_stores.verification;
263 let audit = auth_stores.audit;
264 let policy_engine = Arc::new(PolicyEngine::from_manifest(runtime.manifest()));
265 let change_log = Arc::new(ChangeLog::new());
266
267 for entity in runtime.manifest().entities.iter() {
275 match runtime.list(&entity.name) {
276 Ok(rows) => {
277 for row in rows {
278 if let Some(id) = row.get("id").and_then(|v| v.as_str()) {
279 change_log.append(&entity.name, id, ChangeKind::Insert, Some(row.clone()));
280 }
281 }
282 }
283 Err(_) => {
284 }
286 }
287 }
288 let ws_hub = WsHub::new();
289 let sse_hub = SseHub::new();
290 let is_dev_early = std::env::var("PYLON_DEV_MODE")
303 .map(|v| v == "1" || v == "true")
304 .unwrap_or(true);
305 let plugin_rl_max: u32 = if is_dev_early { 100_000 } else { 100 };
306 let plugin_reg: Arc<PluginRegistry> = plugins.unwrap_or_else(|| {
307 let mut reg = PluginRegistry::new(runtime.manifest().clone());
308 reg.register(Arc::new(
309 pylon_plugin::builtin::rate_limit::RateLimitPlugin::new(
310 plugin_rl_max,
311 std::time::Duration::from_secs(60),
312 ),
313 ));
314 reg.register(Arc::new(
319 pylon_plugin::builtin::tenant_scope::TenantScopePlugin::from_manifest(
320 runtime.manifest(),
321 ),
322 ));
323 Arc::new(reg)
324 });
325 let room_mgr = Arc::new(RoomManager::new(120)); let ws_port = port + 1;
327 let sse_port = port + 2;
328
329 let start_time = Instant::now();
331
332 let metrics = Arc::new(Metrics::new());
333
334 let cache = Arc::new(CachePlugin::new(100_000));
336 let pubsub_broker = Arc::new(PubSubBroker::new(100));
337
338 let job_queue = Arc::new(JobQueue::new(1000));
340
341 let jobs_in_memory = std::env::var("PYLON_JOBS_IN_MEMORY")
347 .map(|v| v == "1" || v == "true")
348 .unwrap_or(false);
349 if !jobs_in_memory {
350 let jobs_db_path = std::env::var("PYLON_JOBS_DB").ok().unwrap_or_else(|| {
351 runtime
352 .db_path()
353 .map(|p| format!("{p}.jobs.db"))
354 .unwrap_or_else(|| "pylon.jobs.db".into())
355 });
356 match crate::job_store::JobStore::open(&jobs_db_path) {
357 Ok(store) => {
358 let store = Arc::new(store);
359 let restored = job_queue.restore_from(&store);
360 if restored > 0 {
361 tracing::info!("[jobs] Restored {restored} pending job(s) from {jobs_db_path}");
362 }
363 job_queue.attach_store(store);
364 }
365 Err(e) => {
366 tracing::warn!(
367 "[jobs] Could not open job store at {jobs_db_path}: {e} — running without persistence"
368 );
369 }
370 }
371 }
372
373 {
375 let cache_ref = Arc::clone(&cache);
376 job_queue.register(
377 "pylon.cache.cleanup",
378 Arc::new(move |_job| {
379 cache_ref.cleanup_expired();
380 JobResult::Success
381 }),
382 );
383 let rooms_ref = Arc::clone(&room_mgr);
384 job_queue.register(
385 "pylon.rooms.cleanup",
386 Arc::new(move |_job| {
387 rooms_ref.cleanup_idle();
388 JobResult::Success
389 }),
390 );
391 }
392
393 let scheduler = Arc::new(Scheduler::new(Arc::clone(&job_queue)));
394 let _ = scheduler.schedule(
396 "pylon.cache.cleanup",
397 "*/10 * * * *",
398 Arc::new(|_| JobResult::Success),
399 );
400 let _ = scheduler.schedule(
401 "pylon.rooms.cleanup",
402 "*/5 * * * *",
403 Arc::new(|_| JobResult::Success),
404 );
405
406 let _worker_handles: Vec<_> = (0..2)
408 .map(|i| {
409 let w = Worker::new(Arc::clone(&job_queue), &format!("worker-{i}"));
410 w.start()
411 })
412 .collect();
413
414 let _scheduler_handle = Arc::clone(&scheduler).start();
416
417 let wf_runner_url = std::env::var("PYLON_WORKFLOW_RUNNER_URL")
419 .unwrap_or_else(|_| "http://127.0.0.1:9876/run".to_string());
420 let workflow_engine = Arc::new(WorkflowEngine::new(&wf_runner_url, 10_000));
421
422 let default_rl_max = if is_dev_early { 100_000 } else { 600 };
433 let rl_max: u32 = std::env::var("PYLON_RATE_LIMIT_MAX")
434 .ok()
435 .and_then(|v| v.parse().ok())
436 .unwrap_or(default_rl_max);
437 let rl_window: u64 = std::env::var("PYLON_RATE_LIMIT_WINDOW")
438 .ok()
439 .and_then(|v| v.parse().ok())
440 .unwrap_or(60);
441 let rate_limiter = Arc::new(RateLimiter::new(rl_max, rl_window));
442
443 let fn_rl_max: u32 = std::env::var("PYLON_FN_RATE_LIMIT_MAX")
447 .ok()
448 .and_then(|v| v.parse().ok())
449 .unwrap_or(30);
450 let fn_rl_window: u64 = std::env::var("PYLON_FN_RATE_LIMIT_WINDOW")
451 .ok()
452 .and_then(|v| v.parse().ok())
453 .unwrap_or(60);
454 let fn_rate_limiter = Arc::new(RateLimiter::new(fn_rl_max, fn_rl_window));
455
456 let fn_notifier: Arc<dyn pylon_router::ChangeNotifier> =
464 Arc::new(crate::datastore::WsSseNotifier {
465 ws: Arc::clone(&ws_hub),
466 sse: Arc::clone(&sse_hub),
467 });
468 let fn_ops_maybe = crate::datastore::try_spawn_functions(
469 Arc::clone(&runtime),
470 Arc::clone(&job_queue),
471 Arc::clone(&fn_rate_limiter),
472 Arc::clone(&change_log),
473 fn_notifier,
474 );
475
476 let is_dev = std::env::var("PYLON_DEV_MODE")
484 .map(|v| v == "1" || v == "true")
485 .unwrap_or(false);
486
487 let cors_origin = match std::env::var("PYLON_CORS_ORIGIN") {
494 Ok(v) => v,
495 Err(_) if is_dev => "*".to_string(),
496 Err(_) => {
497 return Err(
498 "PYLON_CORS_ORIGIN must be set in production (non-dev mode). \
499 Set it to your frontend's origin, or set PYLON_DEV_MODE=true \
500 for local development."
501 .into(),
502 );
503 }
504 };
505 if !is_dev && cors_origin == "*" {
506 return Err("PYLON_CORS_ORIGIN=\"*\" is refused in production mode. \
507 Set it to an explicit origin (https://app.example.com)."
508 .into());
509 }
510 let allow_credentials = cors_origin != "*";
517 if Header::from_bytes(
521 "Access-Control-Allow-Origin",
522 cors_origin.as_bytes().to_vec(),
523 )
524 .is_err()
525 {
526 return Err(format!(
527 "PYLON_CORS_ORIGIN={cors_origin:?} contains bytes that are not a valid HTTP header value"
528 ));
529 }
530
531 let admin_token: Option<String> = std::env::var("PYLON_ADMIN_TOKEN").ok();
533
534 let trust_proxy_hops: usize = std::env::var("PYLON_TRUST_PROXY_HOPS")
544 .ok()
545 .and_then(|v| v.parse().ok())
546 .unwrap_or(0);
547
548 let cookie_config = Arc::new({
555 let app_name = runtime.manifest().name.as_str();
556 pylon_auth::CookieConfig::from_env(&pylon_auth::CookieConfig::default_name_for(app_name))
557 });
558
559 let csrf_origins: Vec<String> = match std::env::var("PYLON_CSRF_ORIGINS") {
569 Ok(v) => v
570 .split(',')
571 .map(|s| s.trim().to_string())
572 .filter(|s| !s.is_empty())
573 .collect(),
574 Err(_) => {
575 if is_dev {
576 vec!["*".to_string()]
577 } else if cors_origin != "*" {
578 vec![cors_origin.clone()]
579 } else {
580 vec![]
582 }
583 }
584 };
585 let csrf = Arc::new(pylon_plugin::builtin::csrf::CsrfPlugin::new(csrf_origins));
586
587 let manifest_trusted: Vec<String> = runtime.manifest().auth.trusted_origins.clone();
601 let trusted_origins: Vec<String> = std::env::var("PYLON_TRUSTED_ORIGINS")
602 .map(|v| {
603 v.split(',')
604 .map(|s| s.trim().to_string())
605 .filter(|s| !s.is_empty())
606 .collect()
607 })
608 .unwrap_or_else(|_| {
609 if is_dev_early {
616 vec![
617 "http://localhost:3000".to_string(),
618 "http://localhost:4321".to_string(),
619 "http://localhost:5173".to_string(),
620 "http://127.0.0.1:3000".to_string(),
621 ]
622 } else {
623 Vec::new()
624 }
625 });
626 let mut combined: Vec<String> = trusted_origins;
628 for m in manifest_trusted {
629 if !m.is_empty() && !combined.contains(&m) {
630 combined.push(m);
631 }
632 }
633 let trusted_origins = Arc::new(combined);
634
635 {
649 let hub = Arc::clone(&ws_hub);
650 let sessions = Arc::clone(&session_store);
651 let runtime_for_fetcher = Arc::clone(&runtime);
652 let pe_for_fetcher = Arc::clone(&policy_engine);
653 let fetcher: crate::ws::SnapshotFetcher = Arc::new(move |auth_ctx, entity, row_id| {
654 use pylon_http::DataStore;
655 let row = match runtime_for_fetcher.get_by_id(entity, row_id) {
660 Ok(Some(v)) => v,
661 _ => return None,
662 };
663 if !matches!(
664 pe_for_fetcher.check_entity_read(entity, auth_ctx, Some(&row)),
665 pylon_policy::PolicyResult::Allowed
666 ) {
667 return None;
668 }
669 let snap = match runtime_for_fetcher.crdt_snapshot(entity, row_id) {
670 Ok(Some(bytes)) => bytes,
671 _ => return None,
672 };
673 pylon_router::encode_crdt_frame(
674 pylon_router::CRDT_FRAME_SNAPSHOT,
675 entity,
676 row_id,
677 &snap,
678 )
679 .ok()
680 });
681 std::thread::spawn(move || {
682 crate::ws::start_ws_server(hub, sessions, ws_port, Some(fetcher));
683 });
684 }
685
686 {
688 let hub = Arc::clone(&sse_hub);
689 std::thread::spawn(move || {
690 crate::sse::start_sse_server(hub, sse_port);
691 });
692 }
693
694 let shard_ws_port = port + 3;
696 if let Some(reg) = shard_registry.clone() {
697 let sessions = Arc::clone(&session_store);
698 std::thread::spawn(move || {
699 crate::shard_ws::start_shard_ws_server(reg, sessions, shard_ws_port);
700 });
701 }
702
703 tracing::warn!("pylon dev server listening on http://localhost:{port}");
704 tracing::info!(" WebSocket: ws://localhost:{ws_port}");
705 tracing::info!(" Studio: http://localhost:{port}/studio");
706 tracing::info!(" API: http://localhost:{port}/api/entities/<entity>");
707 tracing::info!(" Auth: http://localhost:{port}/api/auth/session");
708
709 loop {
713 if SHUTDOWN.load(Ordering::Relaxed) {
714 break;
715 }
716
717 let mut request = match server.recv() {
718 Ok(rq) => rq,
719 Err(_) => {
720 break;
722 }
723 };
724
725 if SHUTDOWN.load(Ordering::Relaxed) {
726 break;
727 }
728
729 let rt = Arc::clone(&runtime);
730 let ss = Arc::clone(&session_store);
731 let pe = Arc::clone(&policy_engine);
732 let cl = Arc::clone(&change_log);
733 let wh = Arc::clone(&ws_hub);
734 let sh = Arc::clone(&sse_hub);
735 let mc = Arc::clone(&magic_codes);
736 let pr = Arc::clone(&plugin_reg);
737 let rm = Arc::clone(&room_mgr);
738 let mt = Arc::clone(&metrics);
739 let os = Arc::clone(&oauth_state);
740 let acc = Arc::clone(&account_store);
741 let ak = Arc::clone(&api_keys);
742 let og = Arc::clone(&orgs);
743 let sw = Arc::clone(&siwe);
744 let pcd = Arc::clone(&phone_codes);
745 let pks = Arc::clone(&passkeys);
746 let vrf = Arc::clone(&verification);
747 let aud = Arc::clone(&audit);
748 let trusted_origins_ref = Arc::clone(&trusted_origins);
749 let ca = Arc::clone(&cache);
750 let ps = Arc::clone(&pubsub_broker);
751 let jq = Arc::clone(&job_queue);
752 let sc = Arc::clone(&scheduler);
753 let we = Arc::clone(&workflow_engine);
754 let fn_ops_ref = fn_ops_maybe.clone();
755 let shards_ref = shard_registry.clone();
756 let cors_origin = cors_origin.clone();
757 let cookie_config = Arc::clone(&cookie_config);
758 let allow_credentials = allow_credentials;
759 let is_dev = is_dev;
760
761 let method = request.method().clone();
762 let url = request.url().to_string();
763
764 let request_peer_ip = resolve_client_ip(&request, trust_proxy_hops);
776 let request_started_at = std::time::Instant::now();
777 if url != "/health" && url != "/metrics" {
778 tracing::info!("→ {} {} from {}", method.as_str(), url, request_peer_ip);
779 crate::metrics::set_current_request(&url, request_started_at);
783 }
784
785 if url == "/health" && method == Method::Get {
787 let uptime = start_time.elapsed().as_secs();
788 let body = serde_json::json!({
789 "status": "ok",
790 "version": "0.1.0",
791 "uptime_secs": uptime,
792 })
793 .to_string();
794
795 let response = with_security_headers(
796 Response::from_string(&body)
797 .with_status_code(200u16)
798 .with_header(Header::from_bytes("Content-Type", "application/json").unwrap())
799 .with_header(
800 Header::from_bytes(
801 "Access-Control-Allow-Origin",
802 cors_origin.as_bytes().to_vec(),
803 )
804 .unwrap(),
805 ),
806 );
807 let _ = request.respond(response);
808 continue;
809 }
810
811 if url == "/metrics" && method == Method::Get {
816 if !is_dev {
817 let admin_bytes = admin_token.as_deref().unwrap_or("").as_bytes();
818 let auth_ok = !admin_bytes.is_empty()
819 && request.headers().iter().any(|h| {
820 let name = h.field.as_str().as_str();
821 name.eq_ignore_ascii_case("Authorization")
822 && h.value
823 .as_str()
824 .strip_prefix("Bearer ")
825 .map(|t| pylon_auth::constant_time_eq(t.as_bytes(), admin_bytes))
826 .unwrap_or(false)
827 });
828 if !auth_ok {
829 let body = json_error(
830 "UNAUTHORIZED",
831 "/metrics requires admin bearer token in non-dev mode",
832 );
833 let response = with_security_headers(
834 Response::from_string(&body)
835 .with_status_code(401u16)
836 .with_header(
837 Header::from_bytes("Content-Type", "application/json").unwrap(),
838 ),
839 );
840 let _ = request.respond(response);
841 continue;
842 }
843 }
844 let prefers_prometheus = request.headers().iter().any(|h| {
845 (h.field.as_str() == "Accept" || h.field.as_str() == "accept")
846 && (h.value.as_str().contains("text/plain")
847 || h.value.as_str().contains("application/openmetrics-text"))
848 });
849 let (body, content_type) = if prefers_prometheus {
850 (mt.prometheus(), "text/plain; version=0.0.4")
851 } else {
852 (mt.snapshot().to_string(), "application/json")
853 };
854 let response = with_security_headers(
855 Response::from_string(&body)
856 .with_status_code(200u16)
857 .with_header(Header::from_bytes("Content-Type", content_type).unwrap())
858 .with_header(
859 Header::from_bytes(
860 "Access-Control-Allow-Origin",
861 cors_origin.as_bytes().to_vec(),
862 )
863 .unwrap(),
864 ),
865 );
866 let _ = request.respond(response);
867 mt.record_request("GET", 200);
868 continue;
869 }
870
871 let peer_ip = resolve_client_ip(&request, trust_proxy_hops);
877
878 let is_preflight = matches!(method, Method::Options);
884 if !is_preflight {
885 if let Err(retry_after) = rate_limiter.check(&peer_ip) {
886 let err_body = json_error(
887 "RATE_LIMITED",
888 &format!("Too many requests. Retry after {retry_after} seconds."),
889 );
890 let response = with_security_headers(
891 Response::from_string(&err_body)
892 .with_status_code(429u16)
893 .with_header(
894 Header::from_bytes("Content-Type", "application/json").unwrap(),
895 )
896 .with_header(
897 Header::from_bytes(
898 "Access-Control-Allow-Origin",
899 cors_origin.as_bytes().to_vec(),
900 )
901 .unwrap(),
902 )
903 .with_header(
904 Header::from_bytes(
905 "Access-Control-Allow-Methods",
906 "GET, POST, PATCH, DELETE, OPTIONS",
907 )
908 .unwrap(),
909 )
910 .with_header(
911 Header::from_bytes(
912 "Access-Control-Allow-Headers",
913 "Content-Type, Authorization",
914 )
915 .unwrap(),
916 )
917 .with_header(
918 Header::from_bytes(
919 "Retry-After",
920 retry_after.to_string().as_bytes().to_vec(),
921 )
922 .unwrap(),
923 ),
924 );
925 let _ = request.respond(response);
926 mt.record_request(method.as_str(), 429);
927 continue;
928 }
929 } {
945 let method_str = method.as_str();
946 let is_bearer = request.headers().iter().any(|h| {
947 (h.field.as_str() == "Authorization" || h.field.as_str() == "authorization")
948 && h.value.as_str().starts_with("Bearer ")
949 });
950 if !is_bearer && !matches!(method, Method::Get | Method::Head | Method::Options) {
955 let origin = request
956 .headers()
957 .iter()
958 .find(|h| h.field.as_str() == "Origin" || h.field.as_str() == "origin")
959 .map(|h| h.value.as_str().to_string());
960 let referer = request
961 .headers()
962 .iter()
963 .find(|h| h.field.as_str() == "Referer" || h.field.as_str() == "referer")
964 .map(|h| h.value.as_str().to_string());
965 if let Err(err) = csrf.check(method_str, origin.as_deref(), referer.as_deref()) {
966 let body = json_error(&err.code, &err.message);
967 let response = with_security_headers(
968 Response::from_string(&body)
969 .with_status_code(err.status)
970 .with_header(
971 Header::from_bytes("Content-Type", "application/json").unwrap(),
972 )
973 .with_header(
974 Header::from_bytes(
975 "Access-Control-Allow-Origin",
976 cors_origin.as_bytes().to_vec(),
977 )
978 .unwrap(),
979 ),
980 );
981 let _ = request.respond(response);
982 mt.record_request(method_str, err.status);
983 continue;
984 }
985 }
986 }
987
988 let bearer_token: Option<String> = request
998 .headers()
999 .iter()
1000 .find(|h| h.field.as_str() == "Authorization" || h.field.as_str() == "authorization")
1001 .and_then(|h| {
1002 let val = h.value.as_str();
1003 val.strip_prefix("Bearer ").map(|t| t.to_string())
1004 });
1005 let cookie_token: Option<String> = if bearer_token.is_some() {
1006 None
1007 } else {
1008 request
1009 .headers()
1010 .iter()
1011 .find(|h| h.field.as_str() == "Cookie" || h.field.as_str() == "cookie")
1012 .and_then(|h| {
1013 pylon_auth::extract_session_cookie(h.value.as_str(), &cookie_config.name)
1014 })
1015 };
1016 let auth_token: Option<String> = bearer_token.or(cookie_token);
1017 let auth_ctx_result: Result<pylon_auth::AuthContext, &'static str> = if admin_token
1026 .is_some()
1027 && auth_token.is_some()
1028 && pylon_auth::constant_time_eq(
1029 auth_token.as_deref().unwrap_or("").as_bytes(),
1030 admin_token.as_deref().unwrap_or("").as_bytes(),
1031 ) {
1032 Ok(pylon_auth::AuthContext::admin())
1033 } else if let Some(t) = auth_token.as_deref() {
1034 if t.starts_with("pk.") {
1035 match ak.verify(t) {
1036 Ok(key) => Ok(pylon_auth::AuthContext::from_api_key(
1037 key.user_id,
1038 key.id,
1039 key.scopes,
1040 )),
1041 Err(_) => Err("INVALID_API_KEY"),
1042 }
1043 } else if pylon_auth::jwt::looks_like_jwt(t) && jwt_secret().is_some() {
1044 let Some(issuer) = jwt_issuer() else {
1050 tracing::warn!(
1051 "[auth] PYLON_JWT_SECRET set but PYLON_JWT_ISSUER missing — \
1052 refusing JWT verify (set both to enable JWT sessions)"
1053 );
1054 Err("JWT_MISCONFIGURED")?;
1055 unreachable!();
1056 };
1057 let secret = jwt_secret().expect("checked above");
1058 match pylon_auth::jwt::verify(t, secret.as_bytes(), Some(issuer)) {
1059 Ok(claims) => {
1060 let mut ctx = pylon_auth::AuthContext::authenticated(claims.sub);
1061 ctx.roles = claims.roles;
1062 if let Some(t) = claims.tenant_id {
1063 ctx = ctx.with_tenant(t);
1064 }
1065 Ok(ctx)
1066 }
1067 Err(_) => Err("INVALID_JWT"),
1068 }
1069 } else {
1070 Ok(ss.resolve(Some(t)))
1071 }
1072 } else {
1073 Ok(ss.resolve(None))
1074 };
1075 let auth_ctx = match auth_ctx_result {
1076 Ok(c) => c,
1077 Err(reason) => {
1078 let body = format!(
1079 r#"{{"error":{{"code":"{reason}","message":"Bearer token is malformed, expired, or revoked"}}}}"#
1080 );
1081 let resp = tiny_http::Response::from_string(body)
1082 .with_status_code(401)
1083 .with_header(
1084 "Content-Type: application/json"
1085 .parse::<tiny_http::Header>()
1086 .unwrap(),
1087 );
1088 let _ = request.respond(resp);
1089 continue;
1090 }
1091 };
1092
1093 if url == "/api/__test__/reset" && method == Method::Post {
1108 let is_loopback = peer_ip == "127.0.0.1"
1109 || peer_ip == "::1"
1110 || peer_ip.starts_with("127.")
1111 || peer_ip == "localhost";
1112 if !is_dev || !rt.is_in_memory() || !is_loopback {
1113 let body = json_error(
1114 "RESET_REFUSED",
1115 "reset endpoint is only available in dev mode + in-memory DB + from loopback",
1116 );
1117 let response = with_security_headers(
1118 Response::from_string(&body)
1119 .with_status_code(403u16)
1120 .with_header(
1121 Header::from_bytes("Content-Type", "application/json").unwrap(),
1122 )
1123 .with_header(
1124 Header::from_bytes(
1125 "Access-Control-Allow-Origin",
1126 cors_origin.as_bytes().to_vec(),
1127 )
1128 .unwrap(),
1129 ),
1130 );
1131 let _ = request.respond(response);
1132 mt.record_request("POST", 403);
1133 continue;
1134 }
1135 let (status, body) = match rt.reset_for_tests() {
1136 Ok(()) => (200u16, "{\"reset\":true}".to_string()),
1137 Err(e) => (500u16, json_error(&e.code, &e.message)),
1138 };
1139 let response = with_security_headers(
1140 Response::from_string(&body)
1141 .with_status_code(status)
1142 .with_header(Header::from_bytes("Content-Type", "application/json").unwrap())
1143 .with_header(
1144 Header::from_bytes(
1145 "Access-Control-Allow-Origin",
1146 cors_origin.as_bytes().to_vec(),
1147 )
1148 .unwrap(),
1149 ),
1150 );
1151 let _ = request.respond(response);
1152 mt.record_request("POST", status);
1153 continue;
1154 }
1155
1156 if url == "/api/files/upload" && method == Method::Post {
1165 const UPLOAD_MAX: usize = 10 * 1024 * 1024;
1166 if let Some(declared) = request.body_length() {
1169 if declared > UPLOAD_MAX {
1170 let err = json_error(
1171 "PAYLOAD_TOO_LARGE",
1172 &format!("Content-Length {declared} exceeds upload max of {UPLOAD_MAX}"),
1173 );
1174 let response = with_security_headers(
1175 Response::from_string(&err)
1176 .with_status_code(413u16)
1177 .with_header(
1178 Header::from_bytes("Content-Type", "application/json").unwrap(),
1179 )
1180 .with_header(
1181 Header::from_bytes(
1182 "Access-Control-Allow-Origin",
1183 cors_origin.as_bytes().to_vec(),
1184 )
1185 .unwrap(),
1186 ),
1187 );
1188 let _ = request.respond(response);
1189 mt.record_request("POST", 413);
1190 continue;
1191 }
1192 }
1193 if auth_ctx.user_id.is_none() {
1194 let err = json_error(
1195 "AUTH_REQUIRED",
1196 "/api/files/upload requires an authenticated session",
1197 );
1198 let response = with_security_headers(
1199 Response::from_string(&err)
1200 .with_status_code(401u16)
1201 .with_header(
1202 Header::from_bytes("Content-Type", "application/json").unwrap(),
1203 )
1204 .with_header(
1205 Header::from_bytes(
1206 "Access-Control-Allow-Origin",
1207 cors_origin.as_bytes().to_vec(),
1208 )
1209 .unwrap(),
1210 ),
1211 );
1212 let _ = request.respond(response);
1213 mt.record_request("POST", 401);
1214 continue;
1215 }
1216 use std::io::Read;
1221 let mut bytes: Vec<u8> = Vec::with_capacity(8192);
1222 let mut limited = request.as_reader().take((UPLOAD_MAX as u64) + 1);
1223 let _ = limited.read_to_end(&mut bytes);
1224
1225 const MAX: usize = UPLOAD_MAX;
1226 if bytes.len() > MAX {
1227 let err = json_error("PAYLOAD_TOO_LARGE", "File exceeds 10 MB limit");
1228 let response = with_security_headers(
1229 Response::from_string(&err)
1230 .with_status_code(413u16)
1231 .with_header(
1232 Header::from_bytes("Content-Type", "application/json").unwrap(),
1233 )
1234 .with_header(
1235 Header::from_bytes(
1236 "Access-Control-Allow-Origin",
1237 cors_origin.as_bytes().to_vec(),
1238 )
1239 .unwrap(),
1240 ),
1241 );
1242 let _ = request.respond(response);
1243 mt.record_request("POST", 413);
1244 continue;
1245 }
1246
1247 let content_type = request
1249 .headers()
1250 .iter()
1251 .find(|h| h.field.as_str() == "Content-Type" || h.field.as_str() == "content-type")
1252 .map(|h| h.value.as_str().to_string())
1253 .unwrap_or_else(|| "application/octet-stream".into());
1254 let filename = request
1255 .headers()
1256 .iter()
1257 .find(|h| h.field.as_str() == "X-Filename" || h.field.as_str() == "x-filename")
1258 .map(|h| h.value.as_str().to_string())
1259 .unwrap_or_else(|| "upload".into());
1260
1261 let (name, ct, payload) = if content_type.starts_with("multipart/form-data") {
1263 match parse_multipart_first_file(&bytes, &content_type) {
1264 Some(p) => p,
1265 None => {
1266 let err = json_error("INVALID_MULTIPART", "Could not parse multipart body");
1267 let response = with_security_headers(
1268 Response::from_string(&err)
1269 .with_status_code(400u16)
1270 .with_header(
1271 Header::from_bytes("Content-Type", "application/json").unwrap(),
1272 )
1273 .with_header(
1274 Header::from_bytes(
1275 "Access-Control-Allow-Origin",
1276 cors_origin.as_bytes().to_vec(),
1277 )
1278 .unwrap(),
1279 ),
1280 );
1281 let _ = request.respond(response);
1282 mt.record_request("POST", 400);
1283 continue;
1284 }
1285 }
1286 } else {
1287 (filename, content_type, bytes)
1288 };
1289
1290 let storage = pylon_storage::files::LocalFileStorage::new(
1291 &std::env::var("PYLON_FILES_DIR").unwrap_or_else(|_| "uploads".into()),
1292 &std::env::var("PYLON_FILES_URL_PREFIX").unwrap_or_else(|_| "/api/files".into()),
1293 );
1294
1295 let (status, body) =
1296 match pylon_storage::files::FileStorage::store(&storage, &name, &payload, &ct) {
1297 Ok(stored) => (
1298 201u16,
1299 serde_json::to_string(&stored).unwrap_or_else(|_| "{}".into()),
1300 ),
1301 Err(e) => (500u16, json_error(&e.code, &e.message)),
1302 };
1303
1304 let response = with_security_headers(
1305 Response::from_string(&body)
1306 .with_status_code(status)
1307 .with_header(Header::from_bytes("Content-Type", "application/json").unwrap())
1308 .with_header(
1309 Header::from_bytes(
1310 "Access-Control-Allow-Origin",
1311 cors_origin.as_bytes().to_vec(),
1312 )
1313 .unwrap(),
1314 ),
1315 );
1316 let _ = request.respond(response);
1317 mt.record_request("POST", status);
1318 continue;
1319 }
1320
1321 const MAX_BODY_SIZE: usize = 10 * 1024 * 1024;
1331
1332 if let Some(declared) = request.body_length() {
1333 if declared > MAX_BODY_SIZE {
1334 let err_body = json_error(
1335 "PAYLOAD_TOO_LARGE",
1336 &format!("Content-Length {declared} exceeds max of {MAX_BODY_SIZE}"),
1337 );
1338 let response = with_security_headers(
1339 Response::from_string(&err_body)
1340 .with_status_code(413u16)
1341 .with_header(
1342 Header::from_bytes(
1343 "Access-Control-Allow-Origin",
1344 cors_origin.as_bytes().to_vec(),
1345 )
1346 .unwrap(),
1347 ),
1348 );
1349 let _ = request.respond(response);
1350 mt.record_request(method.as_str(), 413);
1351 continue;
1352 }
1353 }
1354
1355 let mut body = String::new();
1356 if !matches!(
1357 method,
1358 Method::Get | Method::Head | Method::Options | Method::Delete
1359 ) {
1360 use std::io::Read;
1361 let mut limited = request.as_reader().take((MAX_BODY_SIZE as u64) + 1);
1362 let _ = limited.read_to_string(&mut body);
1363 }
1364
1365 if body.len() > MAX_BODY_SIZE {
1366 let err_body = json_error(
1367 "PAYLOAD_TOO_LARGE",
1368 &format!(
1369 "Request body exceeds maximum size of {} bytes",
1370 MAX_BODY_SIZE,
1371 ),
1372 );
1373 let response = with_security_headers(
1374 Response::from_string(&err_body)
1375 .with_status_code(413u16)
1376 .with_header(Header::from_bytes("Content-Type", "application/json").unwrap())
1377 .with_header(
1378 Header::from_bytes(
1379 "Access-Control-Allow-Origin",
1380 cors_origin.as_bytes().to_vec(),
1381 )
1382 .unwrap(),
1383 ),
1384 );
1385 let _ = request.respond(response);
1386 mt.record_request(method.as_str(), 413);
1387 continue;
1388 }
1389
1390 if method == Method::Get {
1394 if let Some(rest) = url.strip_prefix("/api/shards/") {
1395 let rest = rest.split('?').next().unwrap_or(rest);
1396 if let Some(shard_id) = rest.strip_suffix("/connect") {
1397 if auth_ctx.user_id.is_none() {
1402 let err = json_error(
1403 "AUTH_REQUIRED",
1404 "Shard connect requires an authenticated session",
1405 );
1406 let response = with_security_headers(
1407 Response::from_string(&err)
1408 .with_status_code(401u16)
1409 .with_header(
1410 Header::from_bytes("Content-Type", "application/json").unwrap(),
1411 )
1412 .with_header(
1413 Header::from_bytes(
1414 "Access-Control-Allow-Origin",
1415 cors_origin.as_bytes().to_vec(),
1416 )
1417 .unwrap(),
1418 ),
1419 );
1420 let _ = request.respond(response);
1421 mt.record_request("GET", 401);
1422 continue;
1423 }
1424 let shards = match &shards_ref {
1425 Some(s) => Arc::clone(s),
1426 None => {
1427 let err = json_error(
1428 "SHARDS_NOT_AVAILABLE",
1429 "Shard system is not configured",
1430 );
1431 let response = with_security_headers(
1432 Response::from_string(&err)
1433 .with_status_code(503u16)
1434 .with_header(
1435 Header::from_bytes("Content-Type", "application/json")
1436 .unwrap(),
1437 )
1438 .with_header(
1439 Header::from_bytes(
1440 "Access-Control-Allow-Origin",
1441 cors_origin.as_bytes().to_vec(),
1442 )
1443 .unwrap(),
1444 ),
1445 );
1446 let _ = request.respond(response);
1447 mt.record_request("GET", 503);
1448 continue;
1449 }
1450 };
1451 let shard = match shards.get(shard_id) {
1452 Some(s) => s,
1453 None => {
1454 let err = json_error(
1455 "SHARD_NOT_FOUND",
1456 &format!("Shard \"{shard_id}\" not found"),
1457 );
1458 let response = with_security_headers(
1459 Response::from_string(&err)
1460 .with_status_code(404u16)
1461 .with_header(
1462 Header::from_bytes("Content-Type", "application/json")
1463 .unwrap(),
1464 )
1465 .with_header(
1466 Header::from_bytes(
1467 "Access-Control-Allow-Origin",
1468 cors_origin.as_bytes().to_vec(),
1469 )
1470 .unwrap(),
1471 ),
1472 );
1473 let _ = request.respond(response);
1474 mt.record_request("GET", 404);
1475 continue;
1476 }
1477 };
1478
1479 let sub_id = url
1482 .split("sid=")
1483 .nth(1)
1484 .and_then(|s| s.split('&').next())
1485 .map(|s| s.to_string())
1486 .or_else(|| auth_ctx.user_id.clone())
1487 .unwrap_or_else(|| {
1488 format!(
1489 "anon_{}",
1490 std::time::SystemTime::now()
1491 .duration_since(std::time::UNIX_EPOCH)
1492 .unwrap_or_default()
1493 .as_nanos()
1494 )
1495 });
1496 let subscriber_id = pylon_realtime::SubscriberId::new(sub_id);
1497
1498 let (tx, rx) = std::sync::mpsc::channel::<Vec<u8>>();
1499 let streaming_body = StreamingBody::new(rx);
1500
1501 let tx_clone = tx.clone();
1502 let sink: pylon_realtime::SnapshotSink =
1503 Box::new(move |tick: u64, bytes: &[u8]| {
1504 let mut frame = format!("id: {tick}\ndata: ").into_bytes();
1507 frame.extend_from_slice(bytes);
1508 frame.extend_from_slice(b"\n\n");
1509 let _ = tx_clone.send(frame);
1510 });
1511
1512 let shard_auth = pylon_realtime::ShardAuth {
1513 user_id: auth_ctx.user_id.clone(),
1514 is_admin: auth_ctx.is_admin,
1515 };
1516 if let Err(e) = shard.add_subscriber(subscriber_id.clone(), sink, &shard_auth) {
1517 let (status, code) = match &e {
1518 pylon_realtime::ShardError::Unauthorized(_) => (403u16, "UNAUTHORIZED"),
1519 _ => (429u16, "SUBSCRIBE_FAILED"),
1520 };
1521 let err = json_error(code, &e.to_string());
1522 let response = with_security_headers(
1523 Response::from_string(&err)
1524 .with_status_code(status)
1525 .with_header(
1526 Header::from_bytes("Content-Type", "application/json").unwrap(),
1527 )
1528 .with_header(
1529 Header::from_bytes(
1530 "Access-Control-Allow-Origin",
1531 cors_origin.as_bytes().to_vec(),
1532 )
1533 .unwrap(),
1534 ),
1535 );
1536 let _ = request.respond(response);
1537 mt.record_request("GET", status);
1538 continue;
1539 }
1540
1541 {
1544 let shard_cleanup = Arc::clone(&shard);
1545 let sub_id_cleanup = subscriber_id.clone();
1546 let tx_liveness = tx.clone();
1547 std::thread::spawn(move || {
1548 loop {
1551 std::thread::sleep(std::time::Duration::from_secs(30));
1552 if tx_liveness.send(b": heartbeat\n\n".to_vec()).is_err() {
1553 shard_cleanup.remove_subscriber(&sub_id_cleanup);
1554 return;
1555 }
1556 if !shard_cleanup.is_running() {
1557 return;
1558 }
1559 }
1560 });
1561 }
1562
1563 let response = with_security_headers(Response::new(
1564 tiny_http::StatusCode(200),
1565 vec![
1566 Header::from_bytes("Content-Type", "text/event-stream").unwrap(),
1567 Header::from_bytes("Cache-Control", "no-cache").unwrap(),
1568 Header::from_bytes("Connection", "keep-alive").unwrap(),
1569 Header::from_bytes(
1570 "Access-Control-Allow-Origin",
1571 cors_origin.as_bytes().to_vec(),
1572 )
1573 .unwrap(),
1574 ],
1575 streaming_body,
1576 None,
1577 None,
1578 ));
1579 let _ = request.respond(response);
1580 mt.record_request("GET", 200);
1581 continue;
1582 }
1583 }
1584 }
1585
1586 if method == Method::Post
1588 && url.starts_with("/api/fn/")
1589 && url != "/api/fn/traces"
1590 && request.headers().iter().any(|h| {
1591 (h.field.as_str() == "Accept" || h.field.as_str() == "accept")
1592 && h.value.as_str().contains("text/event-stream")
1593 })
1594 {
1595 let fn_name = url
1596 .strip_prefix("/api/fn/")
1597 .unwrap_or("")
1598 .split('?')
1599 .next()
1600 .unwrap_or("")
1601 .to_string();
1602
1603 if let Some(fn_ops) = &fn_ops_maybe {
1604 if pylon_router::FnOps::get_fn(fn_ops.as_ref(), &fn_name).is_none() {
1608 let err = json_error(
1609 "FN_NOT_FOUND",
1610 &format!("Function \"{fn_name}\" is not registered"),
1611 );
1612 let response = with_security_headers(
1613 Response::from_string(&err)
1614 .with_status_code(404u16)
1615 .with_header(
1616 Header::from_bytes("Content-Type", "application/json").unwrap(),
1617 )
1618 .with_header(
1619 Header::from_bytes(
1620 "Access-Control-Allow-Origin",
1621 cors_origin.as_bytes().to_vec(),
1622 )
1623 .unwrap(),
1624 ),
1625 );
1626 let _ = request.respond(response);
1627 mt.record_request("POST", 404);
1628 continue;
1629 }
1630 let identity = auth_ctx.user_id.as_deref().unwrap_or("anon");
1632 if let Err(retry_after) =
1633 pylon_router::FnOps::check_rate_limit(fn_ops.as_ref(), &fn_name, identity)
1634 {
1635 let body = format!(
1636 r#"{{"error":{{"code":"RATE_LIMITED","message":"Function \"{fn_name}\" rate limit exceeded","retry_after_secs":{retry_after}}}}}"#
1637 );
1638 let response = with_security_headers(
1639 Response::from_string(&body)
1640 .with_status_code(429u16)
1641 .with_header(
1642 Header::from_bytes("Content-Type", "application/json").unwrap(),
1643 )
1644 .with_header(
1645 Header::from_bytes(
1646 "Access-Control-Allow-Origin",
1647 cors_origin.as_bytes().to_vec(),
1648 )
1649 .unwrap(),
1650 ),
1651 );
1652 let _ = request.respond(response);
1653 mt.record_request("POST", 429);
1654 continue;
1655 }
1656
1657 let args: serde_json::Value =
1658 serde_json::from_str(&body).unwrap_or(serde_json::json!({}));
1659
1660 let auth = pylon_functions::protocol::AuthInfo {
1661 user_id: auth_ctx.user_id.clone(),
1662 is_admin: auth_ctx.is_admin,
1663 tenant_id: auth_ctx.tenant_id.clone(),
1664 };
1665
1666 let (tx, rx) = std::sync::mpsc::channel::<Vec<u8>>();
1667 let streaming_body = StreamingBody::new(rx);
1668
1669 let fn_ops_cl = Arc::clone(fn_ops);
1670 let tx_stream = tx.clone();
1671 std::thread::spawn(move || {
1672 let tx_cb = tx_stream.clone();
1673 let on_stream: Box<dyn FnMut(&str) + Send> = Box::new(move |chunk: &str| {
1674 let sse = format!("data: {}\n\n", chunk);
1675 let _ = tx_cb.send(sse.into_bytes());
1676 });
1677
1678 let result = pylon_router::FnOps::call(
1679 fn_ops_cl.as_ref(),
1680 &fn_name,
1681 args,
1682 auth,
1683 Some(on_stream),
1684 None, );
1686 match result {
1687 Ok((value, _trace)) => {
1688 let done = format!(
1689 "event: result\ndata: {}\n\n",
1690 serde_json::to_string(&value).unwrap_or_else(|_| "null".into())
1691 );
1692 let _ = tx_stream.send(done.into_bytes());
1693 }
1694 Err(e) => {
1695 let err = format!(
1696 "event: error\ndata: {}\n\n",
1697 serde_json::json!({"code": e.code, "message": e.message})
1698 );
1699 let _ = tx_stream.send(err.into_bytes());
1700 }
1701 }
1702 });
1703
1704 let response = with_security_headers(Response::new(
1705 tiny_http::StatusCode(200),
1706 vec![
1707 Header::from_bytes("Content-Type", "text/event-stream").unwrap(),
1708 Header::from_bytes("Cache-Control", "no-cache").unwrap(),
1709 Header::from_bytes("Connection", "keep-alive").unwrap(),
1710 Header::from_bytes(
1711 "Access-Control-Allow-Origin",
1712 cors_origin.as_bytes().to_vec(),
1713 )
1714 .unwrap(),
1715 ],
1716 streaming_body,
1717 None,
1718 None,
1719 ));
1720 let _ = request.respond(response);
1721 mt.record_request("POST", 200);
1722 continue;
1723 }
1724 }
1725
1726 if url == "/api/ai/stream" && method == Method::Post {
1728 if auth_ctx.user_id.is_none() {
1731 let err = json_error(
1732 "AUTH_REQUIRED",
1733 "/api/ai/stream requires an authenticated session",
1734 );
1735 let response = with_security_headers(
1736 Response::from_string(&err)
1737 .with_status_code(401u16)
1738 .with_header(
1739 Header::from_bytes("Content-Type", "application/json").unwrap(),
1740 )
1741 .with_header(
1742 Header::from_bytes(
1743 "Access-Control-Allow-Origin",
1744 cors_origin.as_bytes().to_vec(),
1745 )
1746 .unwrap(),
1747 ),
1748 );
1749 let _ = request.respond(response);
1750 mt.record_request("POST", 401);
1751 continue;
1752 }
1753 let ai_provider = std::env::var("PYLON_AI_PROVIDER").unwrap_or_default();
1754 let ai_key = std::env::var("PYLON_AI_API_KEY").unwrap_or_default();
1755 let ai_model = std::env::var("PYLON_AI_MODEL").unwrap_or_default();
1756 let ai_base = std::env::var("PYLON_AI_BASE_URL").unwrap_or_default();
1757
1758 if ai_key.is_empty() && ai_provider != "custom" {
1759 let err = json_error(
1760 "AI_NOT_CONFIGURED",
1761 "Set PYLON_AI_PROVIDER and PYLON_AI_API_KEY",
1762 );
1763 let response = with_security_headers(
1764 Response::from_string(&err)
1765 .with_status_code(503u16)
1766 .with_header(
1767 Header::from_bytes("Content-Type", "application/json").unwrap(),
1768 )
1769 .with_header(
1770 Header::from_bytes(
1771 "Access-Control-Allow-Origin",
1772 cors_origin.as_bytes().to_vec(),
1773 )
1774 .unwrap(),
1775 ),
1776 );
1777 let _ = request.respond(response);
1778 mt.record_request("POST", 503);
1779 continue;
1780 }
1781
1782 let parsed: serde_json::Value = match serde_json::from_str(&body) {
1783 Ok(v) => v,
1784 Err(_) => {
1785 let err = json_error("INVALID_JSON", "Invalid request body");
1786 let response = with_security_headers(
1787 Response::from_string(&err)
1788 .with_status_code(400u16)
1789 .with_header(
1790 Header::from_bytes("Content-Type", "application/json").unwrap(),
1791 )
1792 .with_header(
1793 Header::from_bytes(
1794 "Access-Control-Allow-Origin",
1795 cors_origin.as_bytes().to_vec(),
1796 )
1797 .unwrap(),
1798 ),
1799 );
1800 let _ = request.respond(response);
1801 mt.record_request("POST", 400);
1802 continue;
1803 }
1804 };
1805
1806 let messages: Vec<AiMessage> = match parsed.get("messages").and_then(|m| m.as_array()) {
1807 Some(arr) => arr
1808 .iter()
1809 .filter_map(|m| {
1810 let role = m.get("role")?.as_str()?.to_string();
1811 let content = m.get("content")?.as_str()?.to_string();
1812 Some(AiMessage { role, content })
1813 })
1814 .collect(),
1815 None => {
1816 let err = json_error("MISSING_FIELD", "\"messages\" array is required");
1817 let response = with_security_headers(
1818 Response::from_string(&err)
1819 .with_status_code(400u16)
1820 .with_header(
1821 Header::from_bytes("Content-Type", "application/json").unwrap(),
1822 )
1823 .with_header(
1824 Header::from_bytes(
1825 "Access-Control-Allow-Origin",
1826 cors_origin.as_bytes().to_vec(),
1827 )
1828 .unwrap(),
1829 ),
1830 );
1831 let _ = request.respond(response);
1832 mt.record_request("POST", 400);
1833 continue;
1834 }
1835 };
1836
1837 let model = parsed
1839 .get("model")
1840 .and_then(|m| m.as_str())
1841 .map(|s| s.to_string())
1842 .unwrap_or(ai_model);
1843
1844 let proxy = match ai_provider.as_str() {
1845 "anthropic" => AiProxyPlugin::anthropic(&ai_key, &model),
1846 "openai" => AiProxyPlugin::openai(&ai_key, &model),
1847 "custom" => AiProxyPlugin::custom_with_model(&ai_base, &ai_key, &model),
1848 _ => AiProxyPlugin::openai(&ai_key, &model),
1849 };
1850
1851 let (tx, rx) = std::sync::mpsc::channel::<Vec<u8>>();
1854 let streaming_body = StreamingBody::new(rx);
1855
1856 std::thread::spawn(move || {
1859 let result = proxy.stream_completion(&messages, &mut |chunk| {
1860 let sse = format!(
1861 "data: {}
1862
1863",
1864 serde_json::json!({
1865 "choices": [{"index": 0, "delta": {"content": chunk}}]
1866 })
1867 );
1868 let _ = tx.send(sse.into_bytes());
1869 });
1870
1871 match result {
1873 Ok(_) => {
1874 let _ = tx.send(
1875 b"data: [DONE]
1876
1877"
1878 .to_vec(),
1879 );
1880 }
1881 Err(e) => {
1882 let err_event = format!(
1883 "data: {}
1884
1885",
1886 serde_json::json!({"error": {"message": e, "type": "stream_error"}})
1887 );
1888 let _ = tx.send(err_event.into_bytes());
1889 }
1890 }
1891 });
1893
1894 let response = with_security_headers(Response::new(
1895 tiny_http::StatusCode(200),
1896 vec![
1897 Header::from_bytes("Content-Type", "text/event-stream").unwrap(),
1898 Header::from_bytes("Cache-Control", "no-cache").unwrap(),
1899 Header::from_bytes("Connection", "keep-alive").unwrap(),
1900 Header::from_bytes(
1901 "Access-Control-Allow-Origin",
1902 cors_origin.as_bytes().to_vec(),
1903 )
1904 .unwrap(),
1905 ],
1906 streaming_body,
1907 None, None,
1909 ));
1910 let _ = request.respond(response);
1911 mt.record_request("POST", 200);
1912 continue;
1913 }
1914
1915 let (status, response_body, content_type, is_studio, extra_headers) = if (url == "/studio"
1926 || url == "/studio/")
1927 && method == Method::Get
1928 {
1929 if !is_dev && !auth_ctx.is_admin {
1930 let body = json_error(
1931 "AUTH_REQUIRED",
1932 "/studio requires admin auth in production (set PYLON_ADMIN_TOKEN and pass it as Bearer)",
1933 );
1934 let response = with_security_headers(
1935 Response::from_string(&body)
1936 .with_status_code(401u16)
1937 .with_header(
1938 Header::from_bytes("Content-Type", "application/json").unwrap(),
1939 )
1940 .with_header(
1941 Header::from_bytes(
1942 "Access-Control-Allow-Origin",
1943 cors_origin.as_bytes().to_vec(),
1944 )
1945 .unwrap(),
1946 ),
1947 );
1948 let _ = request.respond(response);
1949 mt.record_request("GET", 401);
1950 continue;
1951 }
1952 let host = request
1959 .headers()
1960 .iter()
1961 .find(|h| h.field.equiv("Host"))
1962 .map(|h| h.value.as_str().to_string())
1963 .unwrap_or_else(|| format!("localhost:{port}"));
1964 let scheme = request
1965 .headers()
1966 .iter()
1967 .find(|h| h.field.equiv("X-Forwarded-Proto"))
1968 .map(|h| h.value.as_str().to_string())
1969 .unwrap_or_else(|| "http".to_string());
1970 let base = format!("{scheme}://{host}");
1971 let html = pylon_studio_api::generate_studio_html(rt.manifest(), &base);
1972 (
1973 200u16,
1974 html,
1975 "text/html",
1976 true,
1977 Vec::<(String, String)>::new(),
1978 )
1979 } else {
1980 let meta = pylon_plugin::RequestMeta {
1984 peer_ip: peer_ip.as_str(),
1985 };
1986 if let Err(e) = pr.run_on_request_with_meta(method.as_str(), &url, &auth_ctx, &meta) {
1987 (
1988 e.status,
1989 json_error(&e.code, &e.message),
1990 "application/json",
1991 false,
1992 Vec::new(),
1993 )
1994 } else if let Some((s, b)) =
1995 pr.try_handle_route(method.as_str(), &url, &body, &auth_ctx)
1996 {
1997 (s, b, "application/json", false, Vec::new())
1999 } else {
2000 let notifier = WsSseNotifier {
2001 ws: Arc::clone(&wh),
2002 sse: Arc::clone(&sh),
2003 };
2004 let openapi_gen = RuntimeOpenApiGenerator {
2005 manifest: rt.manifest(),
2006 };
2007 let file_ops = LocalFileOps::new_default();
2008 let cache_adapter = CacheAdapter(Arc::clone(&ca));
2009 let pubsub_adapter = PubSubAdapter(Arc::clone(&ps));
2010 let email_adapter = EmailAdapter::from_env();
2011 let fn_ops: Option<&dyn pylon_router::FnOps> =
2012 fn_ops_ref.as_deref().map(|f| f as &dyn pylon_router::FnOps);
2013 let shard_adapter = shards_ref.as_ref().map(|reg| ShardOpsAdapter {
2014 registry: Arc::clone(reg),
2015 });
2016 let shard_ops: Option<&dyn pylon_router::ShardOps> = shard_adapter
2017 .as_ref()
2018 .map(|a| a as &dyn pylon_router::ShardOps);
2019 let plugin_hooks = PluginHooksAdapter(Arc::clone(&pr));
2020 let request_headers: Vec<(String, String)> = request
2025 .headers()
2026 .iter()
2027 .map(|h| (h.field.as_str().to_string(), h.value.as_str().to_string()))
2028 .collect();
2029 let router_ctx = pylon_router::RouterContext {
2030 store: rt.as_ref(),
2031 session_store: &ss,
2032 magic_codes: &mc,
2033 oauth_state: &os,
2034 account_store: &acc,
2035 api_keys: &ak,
2036 orgs: &og,
2037 siwe: &sw,
2038 phone_codes: &pcd,
2039 passkeys: &pks,
2040 verification: &vrf,
2041 audit: &aud,
2042 policy_engine: &pe,
2043 change_log: &cl,
2044 notifier: ¬ifier,
2045 rooms: rm.as_ref(),
2046 cache: &cache_adapter,
2047 pubsub: &pubsub_adapter,
2048 jobs: jq.as_ref(),
2049 scheduler: sc.as_ref(),
2050 workflows: we.as_ref(),
2051 files: &file_ops,
2052 openapi: &openapi_gen,
2053 functions: fn_ops,
2054 email: &email_adapter,
2055 shards: shard_ops,
2056 plugin_hooks: &plugin_hooks,
2057 auth_ctx: &auth_ctx,
2058 trusted_origins: &trusted_origins_ref,
2059 is_dev,
2060 request_headers: &request_headers,
2061 peer_ip: peer_ip.as_str(),
2062 cookie_config: cookie_config.as_ref(),
2063 response_headers: std::cell::RefCell::new(Vec::new()),
2064 };
2065 let http_method = HttpMethod::from_str(method.as_str());
2066 let (s, b, _ct) = pylon_router::route(
2067 &router_ctx,
2068 http_method,
2069 &url,
2070 &body,
2071 auth_token.as_deref(),
2072 );
2073 let extra_headers = router_ctx.take_response_headers();
2074 (s, b, "application/json", false, extra_headers)
2075 }
2076 };
2077
2078 let mut response = Response::from_string(&response_body)
2079 .with_status_code(status)
2080 .with_header(Header::from_bytes("Content-Type", content_type).unwrap())
2081 .with_header(
2082 Header::from_bytes(
2083 "Access-Control-Allow-Origin",
2084 cors_origin.as_bytes().to_vec(),
2085 )
2086 .unwrap(),
2087 )
2088 .with_header(
2089 Header::from_bytes(
2090 "Access-Control-Allow-Methods",
2091 "GET, POST, PATCH, DELETE, OPTIONS",
2092 )
2093 .unwrap(),
2094 )
2095 .with_header(
2096 Header::from_bytes(
2097 "Access-Control-Allow-Headers",
2098 "Content-Type, Authorization",
2099 )
2100 .unwrap(),
2101 );
2102 if allow_credentials {
2107 response = response
2108 .with_header(
2109 Header::from_bytes("Access-Control-Allow-Credentials", "true").unwrap(),
2110 )
2111 .with_header(Header::from_bytes("Vary", "Origin").unwrap());
2112 }
2113
2114 for (name, value) in extra_headers {
2121 if let Ok(h) = Header::from_bytes(name.as_bytes(), value.as_bytes().to_vec()) {
2122 response = response.with_header(h);
2123 }
2124 }
2125
2126 if is_studio {
2139 response = response.with_header(
2140 Header::from_bytes(
2141 "Content-Security-Policy",
2142 "default-src 'self' 'unsafe-inline' 'unsafe-eval' https://cdn.tailwindcss.com https://unpkg.com ws: wss:",
2143 ).unwrap(),
2144 );
2145 }
2146
2147 let response = with_security_headers(response);
2148
2149 let _ = request.respond(response);
2150 mt.record_request(method.as_str(), status);
2151 }
2152
2153 tracing::warn!("Shutting down gracefully...");
2154
2155 let drain_timeout = std::time::Duration::from_secs(
2158 std::env::var("PYLON_DRAIN_SECS")
2159 .ok()
2160 .and_then(|s| s.parse().ok())
2161 .unwrap_or(10),
2162 );
2163 let start = Instant::now();
2164
2165 if let Some(reg) = &shard_registry {
2167 for id in reg.ids() {
2168 if let Some(shard) = reg.get(&id) {
2169 shard.stop();
2170 }
2171 }
2172 }
2173
2174 let _ = &scheduler; while start.elapsed() < drain_timeout {
2179 let pending_jobs = job_queue.stats().pending;
2180 if pending_jobs == 0 {
2181 break;
2182 }
2183 std::thread::sleep(std::time::Duration::from_millis(100));
2184 }
2185
2186 let elapsed = start.elapsed();
2187 tracing::warn!(
2188 "Drain complete in {:.1}s (timeout {}s)",
2189 elapsed.as_secs_f32(),
2190 drain_timeout.as_secs()
2191 );
2192 Ok(())
2193}
2194
2195fn json_error(code: &str, message: &str) -> String {
2200 pylon_router::json_error(code, message)
2201}
2202
2203struct AuthStores {
2213 session_store: Arc<SessionStore>,
2214 magic_codes: Arc<pylon_auth::MagicCodeStore>,
2215 oauth_state: Arc<pylon_auth::OAuthStateStore>,
2216 account_store: Arc<pylon_auth::AccountStore>,
2217 api_keys: Arc<pylon_auth::api_key::ApiKeyStore>,
2218 orgs: Arc<pylon_auth::org::OrgStore>,
2219 siwe: Arc<pylon_auth::siwe::NonceStore>,
2220 phone_codes: Arc<pylon_auth::phone::PhoneCodeStore>,
2221 passkeys: Arc<pylon_auth::webauthn::PasskeyStore>,
2222 verification: Arc<pylon_auth::verification::VerificationStore>,
2223 audit: Arc<pylon_auth::audit::AuditStore>,
2224}
2225
2226fn jwt_secret() -> Option<&'static String> {
2231 static CELL: std::sync::OnceLock<Option<String>> = std::sync::OnceLock::new();
2232 CELL.get_or_init(|| {
2233 std::env::var("PYLON_JWT_SECRET")
2234 .ok()
2235 .filter(|s| !s.is_empty())
2236 })
2237 .as_ref()
2238}
2239
2240fn jwt_issuer() -> Option<&'static String> {
2241 static CELL: std::sync::OnceLock<Option<String>> = std::sync::OnceLock::new();
2242 CELL.get_or_init(|| {
2243 std::env::var("PYLON_JWT_ISSUER")
2244 .ok()
2245 .filter(|s| !s.is_empty())
2246 })
2247 .as_ref()
2248}
2249
2250fn build_auth_stores(app_db_path: Option<&str>, session_lifetime: u64) -> AuthStores {
2251 let force_in_memory = std::env::var("PYLON_SESSION_IN_MEMORY")
2254 .map(|v| v == "1" || v == "true")
2255 .unwrap_or(false);
2256
2257 let pg_url = std::env::var("DATABASE_URL")
2260 .ok()
2261 .filter(|u| u.starts_with("postgres://") || u.starts_with("postgresql://"));
2262
2263 if let Some(url) = pg_url {
2264 if force_in_memory {
2265 return in_memory_auth_stores(session_lifetime);
2268 }
2269 return build_pg_auth_stores(&url, session_lifetime);
2270 }
2271
2272 let sqlite_path = std::env::var("PYLON_SESSION_DB")
2273 .ok()
2274 .or_else(|| app_db_path.map(|p| format!("{p}.sessions.db")));
2275
2276 match (force_in_memory, sqlite_path) {
2277 (true, _) | (_, None) => in_memory_auth_stores(session_lifetime),
2278 (false, Some(path)) => build_sqlite_auth_stores(&path, session_lifetime),
2279 }
2280}
2281
2282fn in_memory_auth_stores(session_lifetime: u64) -> AuthStores {
2283 AuthStores {
2284 session_store: Arc::new(SessionStore::new().with_lifetime(session_lifetime)),
2285 magic_codes: Arc::new(pylon_auth::MagicCodeStore::new()),
2286 oauth_state: Arc::new(pylon_auth::OAuthStateStore::new()),
2287 account_store: Arc::new(pylon_auth::AccountStore::new()),
2288 api_keys: Arc::new(pylon_auth::api_key::ApiKeyStore::new()),
2289 orgs: Arc::new(pylon_auth::org::OrgStore::new()),
2290 siwe: Arc::new(pylon_auth::siwe::NonceStore::new()),
2291 phone_codes: Arc::new(pylon_auth::phone::PhoneCodeStore::new()),
2292 passkeys: Arc::new(pylon_auth::webauthn::PasskeyStore::new()),
2293 verification: Arc::new(pylon_auth::verification::VerificationStore::new()),
2294 audit: Arc::new(pylon_auth::audit::AuditStore::new()),
2295 }
2296}
2297
2298fn build_sqlite_auth_stores(path: &str, session_lifetime: u64) -> AuthStores {
2299 let session_store = match crate::session_backend::SqliteSessionBackend::open(path) {
2300 Ok(b) => {
2301 tracing::info!("[pylon] Auth state (SQLite): {path}");
2302 SessionStore::with_backend(Box::new(b)).with_lifetime(session_lifetime)
2303 }
2304 Err(e) => {
2305 tracing::warn!("[pylon] could not open session DB {path}: {e}. In-memory fallback.");
2306 SessionStore::new().with_lifetime(session_lifetime)
2307 }
2308 };
2309 let magic_codes = match crate::magic_code_backend::SqliteMagicCodeBackend::open(path) {
2310 Ok(b) => pylon_auth::MagicCodeStore::with_backend(Box::new(b)),
2311 Err(e) => {
2312 tracing::warn!("[pylon] magic-code SQLite backend unavailable: {e}");
2313 pylon_auth::MagicCodeStore::new()
2314 }
2315 };
2316 let oauth_state = match crate::oauth_backend::SqliteOAuthBackend::open(path) {
2317 Ok(b) => pylon_auth::OAuthStateStore::with_backend(Box::new(b)),
2318 Err(e) => {
2319 tracing::warn!("[pylon] OAuth state SQLite backend unavailable: {e}");
2320 pylon_auth::OAuthStateStore::new()
2321 }
2322 };
2323 let account_store = match crate::account_backend::SqliteAccountBackend::open(path) {
2324 Ok(b) => pylon_auth::AccountStore::with_backend(Box::new(b)),
2325 Err(e) => {
2326 tracing::warn!("[pylon] account-link SQLite backend unavailable: {e}");
2327 pylon_auth::AccountStore::new()
2328 }
2329 };
2330 let api_keys = match crate::api_key_backend::SqliteApiKeyBackend::open(path) {
2331 Ok(b) => pylon_auth::api_key::ApiKeyStore::with_backend(Box::new(b)),
2332 Err(e) => {
2333 tracing::warn!("[pylon] api-key SQLite backend unavailable: {e}");
2334 pylon_auth::api_key::ApiKeyStore::new()
2335 }
2336 };
2337 let orgs = match crate::org_backend::SqliteOrgBackend::open(path) {
2338 Ok(b) => pylon_auth::org::OrgStore::with_backend(Box::new(b)),
2339 Err(e) => {
2340 tracing::warn!("[pylon] org SQLite backend unavailable: {e}");
2341 pylon_auth::org::OrgStore::new()
2342 }
2343 };
2344 let verification = match crate::verification_backend::SqliteVerificationBackend::open(path) {
2345 Ok(b) => pylon_auth::verification::VerificationStore::with_backend(Box::new(b)),
2346 Err(e) => {
2347 tracing::warn!("[pylon] verification SQLite backend unavailable: {e}");
2348 pylon_auth::verification::VerificationStore::new()
2349 }
2350 };
2351 let audit = match crate::audit_backend::SqliteAuditBackend::open(path) {
2352 Ok(b) => pylon_auth::audit::AuditStore::with_backend(Box::new(b)),
2353 Err(e) => {
2354 tracing::warn!("[pylon] audit SQLite backend unavailable: {e}");
2355 pylon_auth::audit::AuditStore::new()
2356 }
2357 };
2358 AuthStores {
2359 session_store: Arc::new(session_store),
2360 magic_codes: Arc::new(magic_codes),
2361 oauth_state: Arc::new(oauth_state),
2362 account_store: Arc::new(account_store),
2363 api_keys: Arc::new(api_keys),
2364 orgs: Arc::new(orgs),
2365 siwe: Arc::new(pylon_auth::siwe::NonceStore::new()),
2366 phone_codes: Arc::new(pylon_auth::phone::PhoneCodeStore::new()),
2367 passkeys: Arc::new(pylon_auth::webauthn::PasskeyStore::new()),
2368 verification: Arc::new(verification),
2369 audit: Arc::new(audit),
2370 }
2371}
2372
2373fn build_pg_auth_stores(url: &str, session_lifetime: u64) -> AuthStores {
2374 let session_store = match crate::session_backend::PostgresSessionBackend::connect(url) {
2379 Ok(b) => {
2380 tracing::info!("[pylon] Auth state (Postgres): {url}");
2381 SessionStore::with_backend(Box::new(b)).with_lifetime(session_lifetime)
2382 }
2383 Err(e) => {
2384 tracing::warn!("[pylon] PG session backend unavailable: {e}. In-memory fallback.");
2385 SessionStore::new().with_lifetime(session_lifetime)
2386 }
2387 };
2388 let magic_codes = match crate::magic_code_backend::PostgresMagicCodeBackend::connect(url) {
2389 Ok(b) => pylon_auth::MagicCodeStore::with_backend(Box::new(b)),
2390 Err(e) => {
2391 tracing::warn!("[pylon] PG magic-code backend unavailable: {e}");
2392 pylon_auth::MagicCodeStore::new()
2393 }
2394 };
2395 let oauth_state = match crate::oauth_backend::PostgresOAuthBackend::connect(url) {
2396 Ok(b) => pylon_auth::OAuthStateStore::with_backend(Box::new(b)),
2397 Err(e) => {
2398 tracing::warn!("[pylon] PG OAuth state backend unavailable: {e}");
2399 pylon_auth::OAuthStateStore::new()
2400 }
2401 };
2402 let account_store = match crate::account_backend::PostgresAccountBackend::connect(url) {
2403 Ok(b) => pylon_auth::AccountStore::with_backend(Box::new(b)),
2404 Err(e) => {
2405 tracing::warn!("[pylon] PG account-link backend unavailable: {e}");
2406 pylon_auth::AccountStore::new()
2407 }
2408 };
2409 let api_keys = match crate::api_key_backend::PostgresApiKeyBackend::connect(url) {
2410 Ok(b) => pylon_auth::api_key::ApiKeyStore::with_backend(Box::new(b)),
2411 Err(e) => {
2412 tracing::warn!("[pylon] PG api-key backend unavailable: {e}");
2413 pylon_auth::api_key::ApiKeyStore::new()
2414 }
2415 };
2416 let orgs = match crate::org_backend::PostgresOrgBackend::connect(url) {
2417 Ok(b) => pylon_auth::org::OrgStore::with_backend(Box::new(b)),
2418 Err(e) => {
2419 tracing::warn!("[pylon] PG org backend unavailable: {e}");
2420 pylon_auth::org::OrgStore::new()
2421 }
2422 };
2423 let verification = match crate::verification_backend::PostgresVerificationBackend::connect(url)
2424 {
2425 Ok(b) => pylon_auth::verification::VerificationStore::with_backend(Box::new(b)),
2426 Err(e) => {
2427 tracing::warn!("[pylon] PG verification backend unavailable: {e}");
2428 pylon_auth::verification::VerificationStore::new()
2429 }
2430 };
2431 let audit = match crate::audit_backend::PostgresAuditBackend::connect(url) {
2432 Ok(b) => pylon_auth::audit::AuditStore::with_backend(Box::new(b)),
2433 Err(e) => {
2434 tracing::warn!("[pylon] PG audit backend unavailable: {e}");
2435 pylon_auth::audit::AuditStore::new()
2436 }
2437 };
2438 AuthStores {
2439 session_store: Arc::new(session_store),
2440 magic_codes: Arc::new(magic_codes),
2441 oauth_state: Arc::new(oauth_state),
2442 account_store: Arc::new(account_store),
2443 api_keys: Arc::new(api_keys),
2444 orgs: Arc::new(orgs),
2445 siwe: Arc::new(pylon_auth::siwe::NonceStore::new()),
2446 phone_codes: Arc::new(pylon_auth::phone::PhoneCodeStore::new()),
2447 passkeys: Arc::new(pylon_auth::webauthn::PasskeyStore::new()),
2448 verification: Arc::new(verification),
2449 audit: Arc::new(audit),
2450 }
2451}
2452
2453#[allow(dead_code)]
2463fn build_session_store(app_db_path: Option<&str>) -> SessionStore {
2464 if std::env::var("PYLON_SESSION_IN_MEMORY")
2465 .map(|v| v == "1" || v == "true")
2466 .unwrap_or(false)
2467 {
2468 return SessionStore::new();
2469 }
2470 let explicit = std::env::var("PYLON_SESSION_DB").ok();
2471 let default_path = app_db_path.map(|p| format!("{p}.sessions.db"));
2472 let path = match explicit.or(default_path) {
2473 Some(p) => p,
2474 None => return SessionStore::new(),
2475 };
2476 match crate::session_backend::SqliteSessionBackend::open(&path) {
2477 Ok(backend) => {
2478 tracing::info!("[pylon] Session persistence enabled: {path}");
2479 SessionStore::with_backend(Box::new(backend))
2480 }
2481 Err(e) => {
2482 tracing::warn!(
2483 "[pylon] could not open session DB {path}: {e}. Falling back to in-memory sessions."
2484 );
2485 SessionStore::new()
2486 }
2487 }
2488}
2489
2490fn parse_multipart_first_file(
2501 body: &[u8],
2502 content_type_header: &str,
2503) -> Option<(String, String, Vec<u8>)> {
2504 let boundary_param = content_type_header
2506 .split(';')
2507 .find_map(|p| p.trim().strip_prefix("boundary="))?;
2508 let boundary = boundary_param.trim_matches('"');
2509 let delimiter = format!("--{boundary}");
2510 let delimiter_bytes = delimiter.as_bytes();
2511
2512 let mut pos = 0usize;
2514 while pos < body.len() {
2515 let next = find_subslice(&body[pos..], delimiter_bytes)?;
2517 let part_start = pos + next + delimiter_bytes.len();
2518 if part_start + 2 > body.len() {
2520 return None;
2521 }
2522 if &body[part_start..part_start + 2] == b"--" {
2523 return None; }
2525 let header_start = part_start + skip_crlf(&body[part_start..]);
2526
2527 let header_end_offset = find_subslice(&body[header_start..], b"\r\n\r\n")?;
2529 let headers = &body[header_start..header_start + header_end_offset];
2530 let data_start = header_start + header_end_offset + 4;
2531
2532 let next_delim_offset = find_subslice(&body[data_start..], delimiter_bytes)?;
2534 let mut data_end = data_start + next_delim_offset;
2536 if data_end >= 2 && &body[data_end - 2..data_end] == b"\r\n" {
2537 data_end -= 2;
2538 }
2539
2540 let headers_str = std::str::from_utf8(headers).ok()?;
2542 let mut filename: Option<String> = None;
2543 let mut part_ct = String::from("application/octet-stream");
2544 let mut has_file = false;
2545 for line in headers_str.split("\r\n") {
2546 let lower = line.to_ascii_lowercase();
2547 if let Some(rest) = lower.strip_prefix("content-disposition:") {
2548 if rest.contains("filename=") {
2549 has_file = true;
2550 if let Some(start) = line.find("filename=\"") {
2552 let from = start + 10;
2553 if let Some(end_offset) = line[from..].find('"') {
2554 filename = Some(line[from..from + end_offset].to_string());
2555 }
2556 }
2557 }
2558 } else if let Some(rest) = lower.strip_prefix("content-type:") {
2559 part_ct = rest.trim().to_string();
2560 }
2561 }
2562
2563 if has_file {
2564 let name = filename.unwrap_or_else(|| "upload".into());
2565 return Some((name, part_ct, body[data_start..data_end].to_vec()));
2566 }
2567
2568 pos = data_end;
2569 }
2570 None
2571}
2572
2573fn find_subslice(haystack: &[u8], needle: &[u8]) -> Option<usize> {
2574 if needle.is_empty() || needle.len() > haystack.len() {
2575 return None;
2576 }
2577 haystack.windows(needle.len()).position(|w| w == needle)
2578}
2579
2580fn skip_crlf(buf: &[u8]) -> usize {
2581 if buf.len() >= 2 && &buf[0..2] == b"\r\n" {
2582 2
2583 } else if !buf.is_empty() && buf[0] == b'\n' {
2584 1
2585 } else {
2586 0
2587 }
2588}
2589
2590#[cfg(test)]
2591mod multipart_tests {
2592 use super::*;
2593
2594 #[test]
2595 fn parses_single_file() {
2596 let body = b"--bnd\r\n\
2597Content-Disposition: form-data; name=\"file\"; filename=\"hello.txt\"\r\n\
2598Content-Type: text/plain\r\n\
2599\r\n\
2600Hello world\r\n\
2601--bnd--\r\n";
2602 let ct = "multipart/form-data; boundary=bnd";
2603 let (name, content_type, bytes) = parse_multipart_first_file(body, ct).unwrap();
2604 assert_eq!(name, "hello.txt");
2605 assert_eq!(content_type, "text/plain");
2606 assert_eq!(bytes, b"Hello world");
2607 }
2608
2609 #[test]
2610 fn returns_none_without_file_part() {
2611 let body = b"--bnd\r\n\
2612Content-Disposition: form-data; name=\"field\"\r\n\
2613\r\n\
2614just text\r\n\
2615--bnd--\r\n";
2616 let ct = "multipart/form-data; boundary=bnd";
2617 assert!(parse_multipart_first_file(body, ct).is_none());
2618 }
2619
2620 #[test]
2621 fn returns_none_when_no_boundary() {
2622 assert!(parse_multipart_first_file(b"anything", "application/json").is_none());
2623 }
2624}