1#[allow(unused_imports)]
2use std::io::Read;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5use std::time::Instant;
6
7use pylon_auth::SessionStore;
8use pylon_http::HttpMethod;
9use pylon_plugin::PluginRegistry;
10use pylon_policy::PolicyEngine;
11use pylon_sync::{ChangeKind, ChangeLog};
12use tiny_http::{Header, Method, Response, Server};
13
14use crate::datastore::{
15 CacheAdapter, EmailAdapter, LocalFileOps, PluginHooksAdapter, PubSubAdapter,
16 RuntimeOpenApiGenerator, ShardOpsAdapter, WsSseNotifier,
17};
18use crate::jobs::{JobQueue, JobResult, Worker};
19use crate::metrics::Metrics;
20use crate::pubsub::PubSubBroker;
21use crate::rate_limit::RateLimiter;
22use crate::rooms::RoomManager;
23use crate::scheduler::Scheduler;
24use crate::sse::SseHub;
25use crate::workflows::WorkflowEngine;
26use crate::ws::WsHub;
27use crate::Runtime;
28use pylon_plugin::builtin::ai_proxy::{AiMessage, AiProxyPlugin};
29use pylon_plugin::builtin::cache::CachePlugin;
30
31struct StreamingBody {
41 rx: std::sync::mpsc::Receiver<Vec<u8>>,
42 buf: Vec<u8>,
43 pos: usize,
44}
45
46impl StreamingBody {
47 fn new(rx: std::sync::mpsc::Receiver<Vec<u8>>) -> Self {
48 Self {
49 rx,
50 buf: Vec::new(),
51 pos: 0,
52 }
53 }
54}
55
56impl std::io::Read for StreamingBody {
57 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
58 if self.pos < self.buf.len() {
61 let remaining = &self.buf[self.pos..];
62 let n = remaining.len().min(buf.len());
63 buf[..n].copy_from_slice(&remaining[..n]);
64 self.pos += n;
65 if self.pos >= self.buf.len() {
66 self.buf.clear();
67 self.pos = 0;
68 }
69 return Ok(n);
70 }
71
72 match self.rx.recv() {
74 Ok(data) if data.is_empty() => Ok(0),
75 Ok(data) => {
76 let n = data.len().min(buf.len());
77 buf[..n].copy_from_slice(&data[..n]);
78 if n < data.len() {
79 self.buf = data;
80 self.pos = n;
81 }
82 Ok(n)
83 }
84 Err(_) => Ok(0), }
86 }
87}
88
89static SHUTDOWN: AtomicBool = AtomicBool::new(false);
91
92pub fn request_shutdown() {
98 SHUTDOWN.store(true, Ordering::SeqCst);
99 if let Some(srv) = SERVER_HANDLE.get() {
102 srv.unblock();
103 }
104}
105
106static SERVER_HANDLE: std::sync::OnceLock<Arc<Server>> = std::sync::OnceLock::new();
109
110fn security_headers() -> Vec<Header> {
116 vec![
117 Header::from_bytes("X-Content-Type-Options", "nosniff").unwrap(),
118 Header::from_bytes("X-Frame-Options", "DENY").unwrap(),
119 Header::from_bytes("X-XSS-Protection", "1; mode=block").unwrap(),
120 ]
121}
122
123fn with_security_headers<R: std::io::Read>(response: Response<R>) -> Response<R> {
125 let mut resp = response;
126 for header in security_headers() {
127 resp = resp.with_header(header);
128 }
129 resp
130}
131
132pub fn start(runtime: Arc<Runtime>, port: u16) -> Result<(), String> {
134 start_with_plugins(runtime, port, None)
135}
136
137pub fn start_with_plugins(
139 runtime: Arc<Runtime>,
140 port: u16,
141 plugins: Option<Arc<PluginRegistry>>,
142) -> Result<(), String> {
143 start_server(runtime, port, plugins, None)
144}
145
146pub fn start_with_shards(
149 runtime: Arc<Runtime>,
150 port: u16,
151 plugins: Option<Arc<PluginRegistry>>,
152 shard_registry: Arc<dyn pylon_realtime::DynShardRegistry>,
153) -> Result<(), String> {
154 start_server(runtime, port, plugins, Some(shard_registry))
155}
156
157fn start_server(
158 runtime: Arc<Runtime>,
159 port: u16,
160 plugins: Option<Arc<PluginRegistry>>,
161 shard_registry: Option<Arc<dyn pylon_realtime::DynShardRegistry>>,
162) -> Result<(), String> {
163 pylon_observability::run_tracing_hook();
168
169 let addr = format!("0.0.0.0:{port}");
170 let server = Server::http(&addr).map_err(|e| format!("Failed to start server: {e}"))?;
171 let server = Arc::new(server);
172
173 let _ = SERVER_HANDLE.set(Arc::clone(&server));
175
176 let session_store = Arc::new(build_session_store(runtime.db_path().as_deref()));
177 let magic_codes = Arc::new(pylon_auth::MagicCodeStore::new());
178 let oauth_state = Arc::new(match std::env::var("PYLON_SESSION_DB").ok() {
182 Some(path) => match crate::oauth_backend::SqliteOAuthBackend::open(&path) {
183 Ok(backend) => pylon_auth::OAuthStateStore::with_backend(Box::new(backend)),
184 Err(e) => {
185 tracing::warn!(
186 "[auth] OAuth state SQLite backend unavailable: {e} — falling back to in-memory"
187 );
188 pylon_auth::OAuthStateStore::new()
189 }
190 },
191 None => pylon_auth::OAuthStateStore::new(),
192 });
193 let policy_engine = Arc::new(PolicyEngine::from_manifest(runtime.manifest()));
194 let change_log = Arc::new(ChangeLog::new());
195
196 for entity in runtime.manifest().entities.iter() {
204 match runtime.list(&entity.name) {
205 Ok(rows) => {
206 for row in rows {
207 if let Some(id) = row.get("id").and_then(|v| v.as_str()) {
208 change_log.append(&entity.name, id, ChangeKind::Insert, Some(row.clone()));
209 }
210 }
211 }
212 Err(_) => {
213 }
215 }
216 }
217 let ws_hub = WsHub::new();
218 let sse_hub = SseHub::new();
219 let is_dev_early = std::env::var("PYLON_DEV_MODE")
232 .map(|v| v == "1" || v == "true")
233 .unwrap_or(true);
234 let plugin_rl_max: u32 = if is_dev_early { 100_000 } else { 100 };
235 let plugin_reg: Arc<PluginRegistry> = plugins.unwrap_or_else(|| {
236 let mut reg = PluginRegistry::new(runtime.manifest().clone());
237 reg.register(Arc::new(
238 pylon_plugin::builtin::rate_limit::RateLimitPlugin::new(
239 plugin_rl_max,
240 std::time::Duration::from_secs(60),
241 ),
242 ));
243 reg.register(Arc::new(
248 pylon_plugin::builtin::tenant_scope::TenantScopePlugin::from_manifest(
249 runtime.manifest(),
250 ),
251 ));
252 Arc::new(reg)
253 });
254 let room_mgr = Arc::new(RoomManager::new(120)); let ws_port = port + 1;
256 let sse_port = port + 2;
257
258 let start_time = Instant::now();
260
261 let metrics = Arc::new(Metrics::new());
262
263 let cache = Arc::new(CachePlugin::new(100_000));
265 let pubsub_broker = Arc::new(PubSubBroker::new(100));
266
267 let job_queue = Arc::new(JobQueue::new(1000));
269
270 let jobs_in_memory = std::env::var("PYLON_JOBS_IN_MEMORY")
276 .map(|v| v == "1" || v == "true")
277 .unwrap_or(false);
278 if !jobs_in_memory {
279 let jobs_db_path = std::env::var("PYLON_JOBS_DB").ok().unwrap_or_else(|| {
280 runtime
281 .db_path()
282 .map(|p| format!("{p}.jobs.db"))
283 .unwrap_or_else(|| "pylon.jobs.db".into())
284 });
285 match crate::job_store::JobStore::open(&jobs_db_path) {
286 Ok(store) => {
287 let store = Arc::new(store);
288 let restored = job_queue.restore_from(&store);
289 if restored > 0 {
290 tracing::info!("[jobs] Restored {restored} pending job(s) from {jobs_db_path}");
291 }
292 job_queue.attach_store(store);
293 }
294 Err(e) => {
295 tracing::warn!(
296 "[jobs] Could not open job store at {jobs_db_path}: {e} — running without persistence"
297 );
298 }
299 }
300 }
301
302 {
304 let cache_ref = Arc::clone(&cache);
305 job_queue.register(
306 "pylon.cache.cleanup",
307 Arc::new(move |_job| {
308 cache_ref.cleanup_expired();
309 JobResult::Success
310 }),
311 );
312 let rooms_ref = Arc::clone(&room_mgr);
313 job_queue.register(
314 "pylon.rooms.cleanup",
315 Arc::new(move |_job| {
316 rooms_ref.cleanup_idle();
317 JobResult::Success
318 }),
319 );
320 }
321
322 let scheduler = Arc::new(Scheduler::new(Arc::clone(&job_queue)));
323 let _ = scheduler.schedule(
325 "pylon.cache.cleanup",
326 "*/10 * * * *",
327 Arc::new(|_| JobResult::Success),
328 );
329 let _ = scheduler.schedule(
330 "pylon.rooms.cleanup",
331 "*/5 * * * *",
332 Arc::new(|_| JobResult::Success),
333 );
334
335 let _worker_handles: Vec<_> = (0..2)
337 .map(|i| {
338 let w = Worker::new(Arc::clone(&job_queue), &format!("worker-{i}"));
339 w.start()
340 })
341 .collect();
342
343 let _scheduler_handle = Arc::clone(&scheduler).start();
345
346 let wf_runner_url = std::env::var("PYLON_WORKFLOW_RUNNER_URL")
348 .unwrap_or_else(|_| "http://127.0.0.1:9876/run".to_string());
349 let workflow_engine = Arc::new(WorkflowEngine::new(&wf_runner_url, 10_000));
350
351 let default_rl_max = if is_dev_early { 100_000 } else { 600 };
362 let rl_max: u32 = std::env::var("PYLON_RATE_LIMIT_MAX")
363 .ok()
364 .and_then(|v| v.parse().ok())
365 .unwrap_or(default_rl_max);
366 let rl_window: u64 = std::env::var("PYLON_RATE_LIMIT_WINDOW")
367 .ok()
368 .and_then(|v| v.parse().ok())
369 .unwrap_or(60);
370 let rate_limiter = Arc::new(RateLimiter::new(rl_max, rl_window));
371
372 let fn_rl_max: u32 = std::env::var("PYLON_FN_RATE_LIMIT_MAX")
376 .ok()
377 .and_then(|v| v.parse().ok())
378 .unwrap_or(30);
379 let fn_rl_window: u64 = std::env::var("PYLON_FN_RATE_LIMIT_WINDOW")
380 .ok()
381 .and_then(|v| v.parse().ok())
382 .unwrap_or(60);
383 let fn_rate_limiter = Arc::new(RateLimiter::new(fn_rl_max, fn_rl_window));
384
385 let fn_notifier: Arc<dyn pylon_router::ChangeNotifier> =
393 Arc::new(crate::datastore::WsSseNotifier {
394 ws: Arc::clone(&ws_hub),
395 sse: Arc::clone(&sse_hub),
396 });
397 let fn_ops_maybe = crate::datastore::try_spawn_functions(
398 Arc::clone(&runtime),
399 Arc::clone(&job_queue),
400 Arc::clone(&fn_rate_limiter),
401 Arc::clone(&change_log),
402 fn_notifier,
403 );
404
405 let is_dev = std::env::var("PYLON_DEV_MODE")
407 .map(|v| v == "1" || v == "true")
408 .unwrap_or(true);
409
410 let cors_origin = match std::env::var("PYLON_CORS_ORIGIN") {
417 Ok(v) => v,
418 Err(_) if is_dev => "*".to_string(),
419 Err(_) => {
420 return Err(
421 "PYLON_CORS_ORIGIN must be set in production (non-dev mode). \
422 Set it to your frontend's origin, or set PYLON_DEV_MODE=true \
423 for local development."
424 .into(),
425 );
426 }
427 };
428 if !is_dev && cors_origin == "*" {
429 return Err("PYLON_CORS_ORIGIN=\"*\" is refused in production mode. \
430 Set it to an explicit origin (https://app.example.com)."
431 .into());
432 }
433 if Header::from_bytes(
437 "Access-Control-Allow-Origin",
438 cors_origin.as_bytes().to_vec(),
439 )
440 .is_err()
441 {
442 return Err(format!(
443 "PYLON_CORS_ORIGIN={cors_origin:?} contains bytes that are not a valid HTTP header value"
444 ));
445 }
446
447 let admin_token: Option<String> = std::env::var("PYLON_ADMIN_TOKEN").ok();
449
450 let csrf_origins: Vec<String> = match std::env::var("PYLON_CSRF_ORIGINS") {
460 Ok(v) => v
461 .split(',')
462 .map(|s| s.trim().to_string())
463 .filter(|s| !s.is_empty())
464 .collect(),
465 Err(_) => {
466 if is_dev {
467 vec!["*".to_string()]
468 } else if cors_origin != "*" {
469 vec![cors_origin.clone()]
470 } else {
471 vec![]
473 }
474 }
475 };
476 let csrf = Arc::new(pylon_plugin::builtin::csrf::CsrfPlugin::new(csrf_origins));
477
478 {
492 let hub = Arc::clone(&ws_hub);
493 let sessions = Arc::clone(&session_store);
494 let runtime_for_fetcher = Arc::clone(&runtime);
495 let pe_for_fetcher = Arc::clone(&policy_engine);
496 let fetcher: crate::ws::SnapshotFetcher = Arc::new(move |auth_ctx, entity, row_id| {
497 use pylon_http::DataStore;
498 let row = match runtime_for_fetcher.get_by_id(entity, row_id) {
503 Ok(Some(v)) => v,
504 _ => return None,
505 };
506 if !matches!(
507 pe_for_fetcher.check_entity_read(entity, auth_ctx, Some(&row)),
508 pylon_policy::PolicyResult::Allowed
509 ) {
510 return None;
511 }
512 let snap = match runtime_for_fetcher.crdt_snapshot(entity, row_id) {
513 Ok(Some(bytes)) => bytes,
514 _ => return None,
515 };
516 pylon_router::encode_crdt_frame(
517 pylon_router::CRDT_FRAME_SNAPSHOT,
518 entity,
519 row_id,
520 &snap,
521 )
522 .ok()
523 });
524 std::thread::spawn(move || {
525 crate::ws::start_ws_server(hub, sessions, ws_port, Some(fetcher));
526 });
527 }
528
529 {
531 let hub = Arc::clone(&sse_hub);
532 std::thread::spawn(move || {
533 crate::sse::start_sse_server(hub, sse_port);
534 });
535 }
536
537 let shard_ws_port = port + 3;
539 if let Some(reg) = shard_registry.clone() {
540 let sessions = Arc::clone(&session_store);
541 std::thread::spawn(move || {
542 crate::shard_ws::start_shard_ws_server(reg, sessions, shard_ws_port);
543 });
544 }
545
546 tracing::warn!("pylon dev server listening on http://localhost:{port}");
547 tracing::info!(" WebSocket: ws://localhost:{ws_port}");
548 tracing::info!(" Studio: http://localhost:{port}/studio");
549 tracing::info!(" API: http://localhost:{port}/api/entities/<entity>");
550 tracing::info!(" Auth: http://localhost:{port}/api/auth/session");
551
552 loop {
556 if SHUTDOWN.load(Ordering::Relaxed) {
557 break;
558 }
559
560 let mut request = match server.recv() {
561 Ok(rq) => rq,
562 Err(_) => {
563 break;
565 }
566 };
567
568 if SHUTDOWN.load(Ordering::Relaxed) {
569 break;
570 }
571
572 let rt = Arc::clone(&runtime);
573 let ss = Arc::clone(&session_store);
574 let pe = Arc::clone(&policy_engine);
575 let cl = Arc::clone(&change_log);
576 let wh = Arc::clone(&ws_hub);
577 let sh = Arc::clone(&sse_hub);
578 let mc = Arc::clone(&magic_codes);
579 let pr = Arc::clone(&plugin_reg);
580 let rm = Arc::clone(&room_mgr);
581 let mt = Arc::clone(&metrics);
582 let os = Arc::clone(&oauth_state);
583 let ca = Arc::clone(&cache);
584 let ps = Arc::clone(&pubsub_broker);
585 let jq = Arc::clone(&job_queue);
586 let sc = Arc::clone(&scheduler);
587 let we = Arc::clone(&workflow_engine);
588 let fn_ops_ref = fn_ops_maybe.clone();
589 let shards_ref = shard_registry.clone();
590 let cors_origin = cors_origin.clone();
591 let is_dev = is_dev;
592
593 let method = request.method().clone();
594 let url = request.url().to_string();
595
596 if url == "/health" && method == Method::Get {
598 let uptime = start_time.elapsed().as_secs();
599 let body = serde_json::json!({
600 "status": "ok",
601 "version": "0.1.0",
602 "uptime_secs": uptime,
603 })
604 .to_string();
605
606 let response = with_security_headers(
607 Response::from_string(&body)
608 .with_status_code(200u16)
609 .with_header(Header::from_bytes("Content-Type", "application/json").unwrap())
610 .with_header(
611 Header::from_bytes(
612 "Access-Control-Allow-Origin",
613 cors_origin.as_bytes().to_vec(),
614 )
615 .unwrap(),
616 ),
617 );
618 let _ = request.respond(response);
619 continue;
620 }
621
622 if url == "/metrics" && method == Method::Get {
627 if !is_dev {
628 let admin_bytes = admin_token.as_deref().unwrap_or("").as_bytes();
629 let auth_ok = !admin_bytes.is_empty()
630 && request.headers().iter().any(|h| {
631 let name = h.field.as_str().as_str();
632 name.eq_ignore_ascii_case("Authorization")
633 && h.value
634 .as_str()
635 .strip_prefix("Bearer ")
636 .map(|t| pylon_auth::constant_time_eq(t.as_bytes(), admin_bytes))
637 .unwrap_or(false)
638 });
639 if !auth_ok {
640 let body = json_error(
641 "UNAUTHORIZED",
642 "/metrics requires admin bearer token in non-dev mode",
643 );
644 let response = with_security_headers(
645 Response::from_string(&body)
646 .with_status_code(401u16)
647 .with_header(
648 Header::from_bytes("Content-Type", "application/json").unwrap(),
649 ),
650 );
651 let _ = request.respond(response);
652 continue;
653 }
654 }
655 let prefers_prometheus = request.headers().iter().any(|h| {
656 (h.field.as_str() == "Accept" || h.field.as_str() == "accept")
657 && (h.value.as_str().contains("text/plain")
658 || h.value.as_str().contains("application/openmetrics-text"))
659 });
660 let (body, content_type) = if prefers_prometheus {
661 (mt.prometheus(), "text/plain; version=0.0.4")
662 } else {
663 (mt.snapshot().to_string(), "application/json")
664 };
665 let response = with_security_headers(
666 Response::from_string(&body)
667 .with_status_code(200u16)
668 .with_header(Header::from_bytes("Content-Type", content_type).unwrap())
669 .with_header(
670 Header::from_bytes(
671 "Access-Control-Allow-Origin",
672 cors_origin.as_bytes().to_vec(),
673 )
674 .unwrap(),
675 ),
676 );
677 let _ = request.respond(response);
678 mt.record_request("GET", 200);
679 continue;
680 }
681
682 let peer_ip = request
684 .remote_addr()
685 .map(|a| a.ip().to_string())
686 .unwrap_or_default();
687
688 let is_preflight = matches!(method, Method::Options);
694 if !is_preflight {
695 if let Err(retry_after) = rate_limiter.check(&peer_ip) {
696 let err_body = json_error(
697 "RATE_LIMITED",
698 &format!("Too many requests. Retry after {retry_after} seconds."),
699 );
700 let response = with_security_headers(
701 Response::from_string(&err_body)
702 .with_status_code(429u16)
703 .with_header(
704 Header::from_bytes("Content-Type", "application/json").unwrap(),
705 )
706 .with_header(
707 Header::from_bytes(
708 "Access-Control-Allow-Origin",
709 cors_origin.as_bytes().to_vec(),
710 )
711 .unwrap(),
712 )
713 .with_header(
714 Header::from_bytes(
715 "Access-Control-Allow-Methods",
716 "GET, POST, PATCH, DELETE, OPTIONS",
717 )
718 .unwrap(),
719 )
720 .with_header(
721 Header::from_bytes(
722 "Access-Control-Allow-Headers",
723 "Content-Type, Authorization",
724 )
725 .unwrap(),
726 )
727 .with_header(
728 Header::from_bytes(
729 "Retry-After",
730 retry_after.to_string().as_bytes().to_vec(),
731 )
732 .unwrap(),
733 ),
734 );
735 let _ = request.respond(response);
736 mt.record_request(method.as_str(), 429);
737 continue;
738 }
739 } {
755 let method_str = method.as_str();
756 let is_bearer = request.headers().iter().any(|h| {
757 (h.field.as_str() == "Authorization" || h.field.as_str() == "authorization")
758 && h.value.as_str().starts_with("Bearer ")
759 });
760 if !is_bearer && !matches!(method, Method::Get | Method::Head | Method::Options) {
765 let origin = request
766 .headers()
767 .iter()
768 .find(|h| h.field.as_str() == "Origin" || h.field.as_str() == "origin")
769 .map(|h| h.value.as_str().to_string());
770 let referer = request
771 .headers()
772 .iter()
773 .find(|h| h.field.as_str() == "Referer" || h.field.as_str() == "referer")
774 .map(|h| h.value.as_str().to_string());
775 if let Err(err) = csrf.check(method_str, origin.as_deref(), referer.as_deref()) {
776 let body = json_error(&err.code, &err.message);
777 let response = with_security_headers(
778 Response::from_string(&body)
779 .with_status_code(err.status)
780 .with_header(
781 Header::from_bytes("Content-Type", "application/json").unwrap(),
782 )
783 .with_header(
784 Header::from_bytes(
785 "Access-Control-Allow-Origin",
786 cors_origin.as_bytes().to_vec(),
787 )
788 .unwrap(),
789 ),
790 );
791 let _ = request.respond(response);
792 mt.record_request(method_str, err.status);
793 continue;
794 }
795 }
796 }
797
798 let auth_token: Option<String> = request
803 .headers()
804 .iter()
805 .find(|h| h.field.as_str() == "Authorization" || h.field.as_str() == "authorization")
806 .and_then(|h| {
807 let val = h.value.as_str();
808 val.strip_prefix("Bearer ").map(|t| t.to_string())
809 });
810 let auth_ctx = if admin_token.is_some()
811 && auth_token.is_some()
812 && pylon_auth::constant_time_eq(
813 auth_token.as_deref().unwrap_or("").as_bytes(),
814 admin_token.as_deref().unwrap_or("").as_bytes(),
815 ) {
816 pylon_auth::AuthContext::admin()
817 } else {
818 ss.resolve(auth_token.as_deref())
819 };
820
821 if url == "/api/__test__/reset" && method == Method::Post {
836 let is_loopback = peer_ip == "127.0.0.1"
837 || peer_ip == "::1"
838 || peer_ip.starts_with("127.")
839 || peer_ip == "localhost";
840 if !is_dev || !rt.is_in_memory() || !is_loopback {
841 let body = json_error(
842 "RESET_REFUSED",
843 "reset endpoint is only available in dev mode + in-memory DB + from loopback",
844 );
845 let response = with_security_headers(
846 Response::from_string(&body)
847 .with_status_code(403u16)
848 .with_header(
849 Header::from_bytes("Content-Type", "application/json").unwrap(),
850 )
851 .with_header(
852 Header::from_bytes(
853 "Access-Control-Allow-Origin",
854 cors_origin.as_bytes().to_vec(),
855 )
856 .unwrap(),
857 ),
858 );
859 let _ = request.respond(response);
860 mt.record_request("POST", 403);
861 continue;
862 }
863 let (status, body) = match rt.reset_for_tests() {
864 Ok(()) => (200u16, "{\"reset\":true}".to_string()),
865 Err(e) => (500u16, json_error(&e.code, &e.message)),
866 };
867 let response = with_security_headers(
868 Response::from_string(&body)
869 .with_status_code(status)
870 .with_header(Header::from_bytes("Content-Type", "application/json").unwrap())
871 .with_header(
872 Header::from_bytes(
873 "Access-Control-Allow-Origin",
874 cors_origin.as_bytes().to_vec(),
875 )
876 .unwrap(),
877 ),
878 );
879 let _ = request.respond(response);
880 mt.record_request("POST", status);
881 continue;
882 }
883
884 if url == "/api/files/upload" && method == Method::Post {
893 const UPLOAD_MAX: usize = 10 * 1024 * 1024;
894 if let Some(declared) = request.body_length() {
897 if declared > UPLOAD_MAX {
898 let err = json_error(
899 "PAYLOAD_TOO_LARGE",
900 &format!("Content-Length {declared} exceeds upload max of {UPLOAD_MAX}"),
901 );
902 let response = with_security_headers(
903 Response::from_string(&err)
904 .with_status_code(413u16)
905 .with_header(
906 Header::from_bytes("Content-Type", "application/json").unwrap(),
907 )
908 .with_header(
909 Header::from_bytes(
910 "Access-Control-Allow-Origin",
911 cors_origin.as_bytes().to_vec(),
912 )
913 .unwrap(),
914 ),
915 );
916 let _ = request.respond(response);
917 mt.record_request("POST", 413);
918 continue;
919 }
920 }
921 if auth_ctx.user_id.is_none() {
922 let err = json_error(
923 "AUTH_REQUIRED",
924 "/api/files/upload requires an authenticated session",
925 );
926 let response = with_security_headers(
927 Response::from_string(&err)
928 .with_status_code(401u16)
929 .with_header(
930 Header::from_bytes("Content-Type", "application/json").unwrap(),
931 )
932 .with_header(
933 Header::from_bytes(
934 "Access-Control-Allow-Origin",
935 cors_origin.as_bytes().to_vec(),
936 )
937 .unwrap(),
938 ),
939 );
940 let _ = request.respond(response);
941 mt.record_request("POST", 401);
942 continue;
943 }
944 use std::io::Read;
949 let mut bytes: Vec<u8> = Vec::with_capacity(8192);
950 let mut limited = request.as_reader().take((UPLOAD_MAX as u64) + 1);
951 let _ = limited.read_to_end(&mut bytes);
952
953 const MAX: usize = UPLOAD_MAX;
954 if bytes.len() > MAX {
955 let err = json_error("PAYLOAD_TOO_LARGE", "File exceeds 10 MB limit");
956 let response = with_security_headers(
957 Response::from_string(&err)
958 .with_status_code(413u16)
959 .with_header(
960 Header::from_bytes("Content-Type", "application/json").unwrap(),
961 )
962 .with_header(
963 Header::from_bytes(
964 "Access-Control-Allow-Origin",
965 cors_origin.as_bytes().to_vec(),
966 )
967 .unwrap(),
968 ),
969 );
970 let _ = request.respond(response);
971 mt.record_request("POST", 413);
972 continue;
973 }
974
975 let content_type = request
977 .headers()
978 .iter()
979 .find(|h| h.field.as_str() == "Content-Type" || h.field.as_str() == "content-type")
980 .map(|h| h.value.as_str().to_string())
981 .unwrap_or_else(|| "application/octet-stream".into());
982 let filename = request
983 .headers()
984 .iter()
985 .find(|h| h.field.as_str() == "X-Filename" || h.field.as_str() == "x-filename")
986 .map(|h| h.value.as_str().to_string())
987 .unwrap_or_else(|| "upload".into());
988
989 let (name, ct, payload) = if content_type.starts_with("multipart/form-data") {
991 match parse_multipart_first_file(&bytes, &content_type) {
992 Some(p) => p,
993 None => {
994 let err = json_error("INVALID_MULTIPART", "Could not parse multipart body");
995 let response = with_security_headers(
996 Response::from_string(&err)
997 .with_status_code(400u16)
998 .with_header(
999 Header::from_bytes("Content-Type", "application/json").unwrap(),
1000 )
1001 .with_header(
1002 Header::from_bytes(
1003 "Access-Control-Allow-Origin",
1004 cors_origin.as_bytes().to_vec(),
1005 )
1006 .unwrap(),
1007 ),
1008 );
1009 let _ = request.respond(response);
1010 mt.record_request("POST", 400);
1011 continue;
1012 }
1013 }
1014 } else {
1015 (filename, content_type, bytes)
1016 };
1017
1018 let storage = pylon_storage::files::LocalFileStorage::new(
1019 &std::env::var("PYLON_FILES_DIR").unwrap_or_else(|_| "uploads".into()),
1020 &std::env::var("PYLON_FILES_URL_PREFIX").unwrap_or_else(|_| "/api/files".into()),
1021 );
1022
1023 let (status, body) =
1024 match pylon_storage::files::FileStorage::store(&storage, &name, &payload, &ct) {
1025 Ok(stored) => (
1026 201u16,
1027 serde_json::to_string(&stored).unwrap_or_else(|_| "{}".into()),
1028 ),
1029 Err(e) => (500u16, json_error(&e.code, &e.message)),
1030 };
1031
1032 let response = with_security_headers(
1033 Response::from_string(&body)
1034 .with_status_code(status)
1035 .with_header(Header::from_bytes("Content-Type", "application/json").unwrap())
1036 .with_header(
1037 Header::from_bytes(
1038 "Access-Control-Allow-Origin",
1039 cors_origin.as_bytes().to_vec(),
1040 )
1041 .unwrap(),
1042 ),
1043 );
1044 let _ = request.respond(response);
1045 mt.record_request("POST", status);
1046 continue;
1047 }
1048
1049 const MAX_BODY_SIZE: usize = 10 * 1024 * 1024;
1059
1060 if let Some(declared) = request.body_length() {
1061 if declared > MAX_BODY_SIZE {
1062 let err_body = json_error(
1063 "PAYLOAD_TOO_LARGE",
1064 &format!("Content-Length {declared} exceeds max of {MAX_BODY_SIZE}"),
1065 );
1066 let response = with_security_headers(
1067 Response::from_string(&err_body)
1068 .with_status_code(413u16)
1069 .with_header(
1070 Header::from_bytes(
1071 "Access-Control-Allow-Origin",
1072 cors_origin.as_bytes().to_vec(),
1073 )
1074 .unwrap(),
1075 ),
1076 );
1077 let _ = request.respond(response);
1078 mt.record_request(method.as_str(), 413);
1079 continue;
1080 }
1081 }
1082
1083 let mut body = String::new();
1084 if !matches!(
1085 method,
1086 Method::Get | Method::Head | Method::Options | Method::Delete
1087 ) {
1088 use std::io::Read;
1089 let mut limited = request.as_reader().take((MAX_BODY_SIZE as u64) + 1);
1090 let _ = limited.read_to_string(&mut body);
1091 }
1092
1093 if body.len() > MAX_BODY_SIZE {
1094 let err_body = json_error(
1095 "PAYLOAD_TOO_LARGE",
1096 &format!(
1097 "Request body exceeds maximum size of {} bytes",
1098 MAX_BODY_SIZE,
1099 ),
1100 );
1101 let response = with_security_headers(
1102 Response::from_string(&err_body)
1103 .with_status_code(413u16)
1104 .with_header(Header::from_bytes("Content-Type", "application/json").unwrap())
1105 .with_header(
1106 Header::from_bytes(
1107 "Access-Control-Allow-Origin",
1108 cors_origin.as_bytes().to_vec(),
1109 )
1110 .unwrap(),
1111 ),
1112 );
1113 let _ = request.respond(response);
1114 mt.record_request(method.as_str(), 413);
1115 continue;
1116 }
1117
1118 if method == Method::Get {
1122 if let Some(rest) = url.strip_prefix("/api/shards/") {
1123 let rest = rest.split('?').next().unwrap_or(rest);
1124 if let Some(shard_id) = rest.strip_suffix("/connect") {
1125 if auth_ctx.user_id.is_none() {
1130 let err = json_error(
1131 "AUTH_REQUIRED",
1132 "Shard connect requires an authenticated session",
1133 );
1134 let response = with_security_headers(
1135 Response::from_string(&err)
1136 .with_status_code(401u16)
1137 .with_header(
1138 Header::from_bytes("Content-Type", "application/json").unwrap(),
1139 )
1140 .with_header(
1141 Header::from_bytes(
1142 "Access-Control-Allow-Origin",
1143 cors_origin.as_bytes().to_vec(),
1144 )
1145 .unwrap(),
1146 ),
1147 );
1148 let _ = request.respond(response);
1149 mt.record_request("GET", 401);
1150 continue;
1151 }
1152 let shards = match &shards_ref {
1153 Some(s) => Arc::clone(s),
1154 None => {
1155 let err = json_error(
1156 "SHARDS_NOT_AVAILABLE",
1157 "Shard system is not configured",
1158 );
1159 let response = with_security_headers(
1160 Response::from_string(&err)
1161 .with_status_code(503u16)
1162 .with_header(
1163 Header::from_bytes("Content-Type", "application/json")
1164 .unwrap(),
1165 )
1166 .with_header(
1167 Header::from_bytes(
1168 "Access-Control-Allow-Origin",
1169 cors_origin.as_bytes().to_vec(),
1170 )
1171 .unwrap(),
1172 ),
1173 );
1174 let _ = request.respond(response);
1175 mt.record_request("GET", 503);
1176 continue;
1177 }
1178 };
1179 let shard = match shards.get(shard_id) {
1180 Some(s) => s,
1181 None => {
1182 let err = json_error(
1183 "SHARD_NOT_FOUND",
1184 &format!("Shard \"{shard_id}\" not found"),
1185 );
1186 let response = with_security_headers(
1187 Response::from_string(&err)
1188 .with_status_code(404u16)
1189 .with_header(
1190 Header::from_bytes("Content-Type", "application/json")
1191 .unwrap(),
1192 )
1193 .with_header(
1194 Header::from_bytes(
1195 "Access-Control-Allow-Origin",
1196 cors_origin.as_bytes().to_vec(),
1197 )
1198 .unwrap(),
1199 ),
1200 );
1201 let _ = request.respond(response);
1202 mt.record_request("GET", 404);
1203 continue;
1204 }
1205 };
1206
1207 let sub_id = url
1210 .split("sid=")
1211 .nth(1)
1212 .and_then(|s| s.split('&').next())
1213 .map(|s| s.to_string())
1214 .or_else(|| auth_ctx.user_id.clone())
1215 .unwrap_or_else(|| {
1216 format!(
1217 "anon_{}",
1218 std::time::SystemTime::now()
1219 .duration_since(std::time::UNIX_EPOCH)
1220 .unwrap_or_default()
1221 .as_nanos()
1222 )
1223 });
1224 let subscriber_id = pylon_realtime::SubscriberId::new(sub_id);
1225
1226 let (tx, rx) = std::sync::mpsc::channel::<Vec<u8>>();
1227 let streaming_body = StreamingBody::new(rx);
1228
1229 let tx_clone = tx.clone();
1230 let sink: pylon_realtime::SnapshotSink =
1231 Box::new(move |tick: u64, bytes: &[u8]| {
1232 let mut frame = format!("id: {tick}\ndata: ").into_bytes();
1235 frame.extend_from_slice(bytes);
1236 frame.extend_from_slice(b"\n\n");
1237 let _ = tx_clone.send(frame);
1238 });
1239
1240 let shard_auth = pylon_realtime::ShardAuth {
1241 user_id: auth_ctx.user_id.clone(),
1242 is_admin: auth_ctx.is_admin,
1243 };
1244 if let Err(e) = shard.add_subscriber(subscriber_id.clone(), sink, &shard_auth) {
1245 let (status, code) = match &e {
1246 pylon_realtime::ShardError::Unauthorized(_) => (403u16, "UNAUTHORIZED"),
1247 _ => (429u16, "SUBSCRIBE_FAILED"),
1248 };
1249 let err = json_error(code, &e.to_string());
1250 let response = with_security_headers(
1251 Response::from_string(&err)
1252 .with_status_code(status)
1253 .with_header(
1254 Header::from_bytes("Content-Type", "application/json").unwrap(),
1255 )
1256 .with_header(
1257 Header::from_bytes(
1258 "Access-Control-Allow-Origin",
1259 cors_origin.as_bytes().to_vec(),
1260 )
1261 .unwrap(),
1262 ),
1263 );
1264 let _ = request.respond(response);
1265 mt.record_request("GET", status);
1266 continue;
1267 }
1268
1269 {
1272 let shard_cleanup = Arc::clone(&shard);
1273 let sub_id_cleanup = subscriber_id.clone();
1274 let tx_liveness = tx.clone();
1275 std::thread::spawn(move || {
1276 loop {
1279 std::thread::sleep(std::time::Duration::from_secs(30));
1280 if tx_liveness.send(b": heartbeat\n\n".to_vec()).is_err() {
1281 shard_cleanup.remove_subscriber(&sub_id_cleanup);
1282 return;
1283 }
1284 if !shard_cleanup.is_running() {
1285 return;
1286 }
1287 }
1288 });
1289 }
1290
1291 let response = with_security_headers(Response::new(
1292 tiny_http::StatusCode(200),
1293 vec![
1294 Header::from_bytes("Content-Type", "text/event-stream").unwrap(),
1295 Header::from_bytes("Cache-Control", "no-cache").unwrap(),
1296 Header::from_bytes("Connection", "keep-alive").unwrap(),
1297 Header::from_bytes(
1298 "Access-Control-Allow-Origin",
1299 cors_origin.as_bytes().to_vec(),
1300 )
1301 .unwrap(),
1302 ],
1303 streaming_body,
1304 None,
1305 None,
1306 ));
1307 let _ = request.respond(response);
1308 mt.record_request("GET", 200);
1309 continue;
1310 }
1311 }
1312 }
1313
1314 if method == Method::Post
1316 && url.starts_with("/api/fn/")
1317 && url != "/api/fn/traces"
1318 && request.headers().iter().any(|h| {
1319 (h.field.as_str() == "Accept" || h.field.as_str() == "accept")
1320 && h.value.as_str().contains("text/event-stream")
1321 })
1322 {
1323 let fn_name = url
1324 .strip_prefix("/api/fn/")
1325 .unwrap_or("")
1326 .split('?')
1327 .next()
1328 .unwrap_or("")
1329 .to_string();
1330
1331 if let Some(fn_ops) = &fn_ops_maybe {
1332 if pylon_router::FnOps::get_fn(fn_ops.as_ref(), &fn_name).is_none() {
1336 let err = json_error(
1337 "FN_NOT_FOUND",
1338 &format!("Function \"{fn_name}\" is not registered"),
1339 );
1340 let response = with_security_headers(
1341 Response::from_string(&err)
1342 .with_status_code(404u16)
1343 .with_header(
1344 Header::from_bytes("Content-Type", "application/json").unwrap(),
1345 )
1346 .with_header(
1347 Header::from_bytes(
1348 "Access-Control-Allow-Origin",
1349 cors_origin.as_bytes().to_vec(),
1350 )
1351 .unwrap(),
1352 ),
1353 );
1354 let _ = request.respond(response);
1355 mt.record_request("POST", 404);
1356 continue;
1357 }
1358 let identity = auth_ctx.user_id.as_deref().unwrap_or("anon");
1360 if let Err(retry_after) =
1361 pylon_router::FnOps::check_rate_limit(fn_ops.as_ref(), &fn_name, identity)
1362 {
1363 let body = format!(
1364 r#"{{"error":{{"code":"RATE_LIMITED","message":"Function \"{fn_name}\" rate limit exceeded","retry_after_secs":{retry_after}}}}}"#
1365 );
1366 let response = with_security_headers(
1367 Response::from_string(&body)
1368 .with_status_code(429u16)
1369 .with_header(
1370 Header::from_bytes("Content-Type", "application/json").unwrap(),
1371 )
1372 .with_header(
1373 Header::from_bytes(
1374 "Access-Control-Allow-Origin",
1375 cors_origin.as_bytes().to_vec(),
1376 )
1377 .unwrap(),
1378 ),
1379 );
1380 let _ = request.respond(response);
1381 mt.record_request("POST", 429);
1382 continue;
1383 }
1384
1385 let args: serde_json::Value =
1386 serde_json::from_str(&body).unwrap_or(serde_json::json!({}));
1387
1388 let auth = pylon_functions::protocol::AuthInfo {
1389 user_id: auth_ctx.user_id.clone(),
1390 is_admin: auth_ctx.is_admin,
1391 tenant_id: auth_ctx.tenant_id.clone(),
1392 };
1393
1394 let (tx, rx) = std::sync::mpsc::channel::<Vec<u8>>();
1395 let streaming_body = StreamingBody::new(rx);
1396
1397 let fn_ops_cl = Arc::clone(fn_ops);
1398 let tx_stream = tx.clone();
1399 std::thread::spawn(move || {
1400 let tx_cb = tx_stream.clone();
1401 let on_stream: Box<dyn FnMut(&str) + Send> = Box::new(move |chunk: &str| {
1402 let sse = format!("data: {}\n\n", chunk);
1403 let _ = tx_cb.send(sse.into_bytes());
1404 });
1405
1406 let result = pylon_router::FnOps::call(
1407 fn_ops_cl.as_ref(),
1408 &fn_name,
1409 args,
1410 auth,
1411 Some(on_stream),
1412 None, );
1414 match result {
1415 Ok((value, _trace)) => {
1416 let done = format!(
1417 "event: result\ndata: {}\n\n",
1418 serde_json::to_string(&value).unwrap_or_else(|_| "null".into())
1419 );
1420 let _ = tx_stream.send(done.into_bytes());
1421 }
1422 Err(e) => {
1423 let err = format!(
1424 "event: error\ndata: {}\n\n",
1425 serde_json::json!({"code": e.code, "message": e.message})
1426 );
1427 let _ = tx_stream.send(err.into_bytes());
1428 }
1429 }
1430 });
1431
1432 let response = with_security_headers(Response::new(
1433 tiny_http::StatusCode(200),
1434 vec![
1435 Header::from_bytes("Content-Type", "text/event-stream").unwrap(),
1436 Header::from_bytes("Cache-Control", "no-cache").unwrap(),
1437 Header::from_bytes("Connection", "keep-alive").unwrap(),
1438 Header::from_bytes(
1439 "Access-Control-Allow-Origin",
1440 cors_origin.as_bytes().to_vec(),
1441 )
1442 .unwrap(),
1443 ],
1444 streaming_body,
1445 None,
1446 None,
1447 ));
1448 let _ = request.respond(response);
1449 mt.record_request("POST", 200);
1450 continue;
1451 }
1452 }
1453
1454 if url == "/api/ai/stream" && method == Method::Post {
1456 if auth_ctx.user_id.is_none() {
1459 let err = json_error(
1460 "AUTH_REQUIRED",
1461 "/api/ai/stream requires an authenticated session",
1462 );
1463 let response = with_security_headers(
1464 Response::from_string(&err)
1465 .with_status_code(401u16)
1466 .with_header(
1467 Header::from_bytes("Content-Type", "application/json").unwrap(),
1468 )
1469 .with_header(
1470 Header::from_bytes(
1471 "Access-Control-Allow-Origin",
1472 cors_origin.as_bytes().to_vec(),
1473 )
1474 .unwrap(),
1475 ),
1476 );
1477 let _ = request.respond(response);
1478 mt.record_request("POST", 401);
1479 continue;
1480 }
1481 let ai_provider = std::env::var("PYLON_AI_PROVIDER").unwrap_or_default();
1482 let ai_key = std::env::var("PYLON_AI_API_KEY").unwrap_or_default();
1483 let ai_model = std::env::var("PYLON_AI_MODEL").unwrap_or_default();
1484 let ai_base = std::env::var("PYLON_AI_BASE_URL").unwrap_or_default();
1485
1486 if ai_key.is_empty() && ai_provider != "custom" {
1487 let err = json_error(
1488 "AI_NOT_CONFIGURED",
1489 "Set PYLON_AI_PROVIDER and PYLON_AI_API_KEY",
1490 );
1491 let response = with_security_headers(
1492 Response::from_string(&err)
1493 .with_status_code(503u16)
1494 .with_header(
1495 Header::from_bytes("Content-Type", "application/json").unwrap(),
1496 )
1497 .with_header(
1498 Header::from_bytes(
1499 "Access-Control-Allow-Origin",
1500 cors_origin.as_bytes().to_vec(),
1501 )
1502 .unwrap(),
1503 ),
1504 );
1505 let _ = request.respond(response);
1506 mt.record_request("POST", 503);
1507 continue;
1508 }
1509
1510 let parsed: serde_json::Value = match serde_json::from_str(&body) {
1511 Ok(v) => v,
1512 Err(_) => {
1513 let err = json_error("INVALID_JSON", "Invalid request body");
1514 let response = with_security_headers(
1515 Response::from_string(&err)
1516 .with_status_code(400u16)
1517 .with_header(
1518 Header::from_bytes("Content-Type", "application/json").unwrap(),
1519 )
1520 .with_header(
1521 Header::from_bytes(
1522 "Access-Control-Allow-Origin",
1523 cors_origin.as_bytes().to_vec(),
1524 )
1525 .unwrap(),
1526 ),
1527 );
1528 let _ = request.respond(response);
1529 mt.record_request("POST", 400);
1530 continue;
1531 }
1532 };
1533
1534 let messages: Vec<AiMessage> = match parsed.get("messages").and_then(|m| m.as_array()) {
1535 Some(arr) => arr
1536 .iter()
1537 .filter_map(|m| {
1538 let role = m.get("role")?.as_str()?.to_string();
1539 let content = m.get("content")?.as_str()?.to_string();
1540 Some(AiMessage { role, content })
1541 })
1542 .collect(),
1543 None => {
1544 let err = json_error("MISSING_FIELD", "\"messages\" array is required");
1545 let response = with_security_headers(
1546 Response::from_string(&err)
1547 .with_status_code(400u16)
1548 .with_header(
1549 Header::from_bytes("Content-Type", "application/json").unwrap(),
1550 )
1551 .with_header(
1552 Header::from_bytes(
1553 "Access-Control-Allow-Origin",
1554 cors_origin.as_bytes().to_vec(),
1555 )
1556 .unwrap(),
1557 ),
1558 );
1559 let _ = request.respond(response);
1560 mt.record_request("POST", 400);
1561 continue;
1562 }
1563 };
1564
1565 let model = parsed
1567 .get("model")
1568 .and_then(|m| m.as_str())
1569 .map(|s| s.to_string())
1570 .unwrap_or(ai_model);
1571
1572 let proxy = match ai_provider.as_str() {
1573 "anthropic" => AiProxyPlugin::anthropic(&ai_key, &model),
1574 "openai" => AiProxyPlugin::openai(&ai_key, &model),
1575 "custom" => AiProxyPlugin::custom_with_model(&ai_base, &ai_key, &model),
1576 _ => AiProxyPlugin::openai(&ai_key, &model),
1577 };
1578
1579 let (tx, rx) = std::sync::mpsc::channel::<Vec<u8>>();
1582 let streaming_body = StreamingBody::new(rx);
1583
1584 std::thread::spawn(move || {
1587 let result = proxy.stream_completion(&messages, &mut |chunk| {
1588 let sse = format!(
1589 "data: {}
1590
1591",
1592 serde_json::json!({
1593 "choices": [{"index": 0, "delta": {"content": chunk}}]
1594 })
1595 );
1596 let _ = tx.send(sse.into_bytes());
1597 });
1598
1599 match result {
1601 Ok(_) => {
1602 let _ = tx.send(
1603 b"data: [DONE]
1604
1605"
1606 .to_vec(),
1607 );
1608 }
1609 Err(e) => {
1610 let err_event = format!(
1611 "data: {}
1612
1613",
1614 serde_json::json!({"error": {"message": e, "type": "stream_error"}})
1615 );
1616 let _ = tx.send(err_event.into_bytes());
1617 }
1618 }
1619 });
1621
1622 let response = with_security_headers(Response::new(
1623 tiny_http::StatusCode(200),
1624 vec![
1625 Header::from_bytes("Content-Type", "text/event-stream").unwrap(),
1626 Header::from_bytes("Cache-Control", "no-cache").unwrap(),
1627 Header::from_bytes("Connection", "keep-alive").unwrap(),
1628 Header::from_bytes(
1629 "Access-Control-Allow-Origin",
1630 cors_origin.as_bytes().to_vec(),
1631 )
1632 .unwrap(),
1633 ],
1634 streaming_body,
1635 None, None,
1637 ));
1638 let _ = request.respond(response);
1639 mt.record_request("POST", 200);
1640 continue;
1641 }
1642
1643 let (status, response_body, content_type, is_studio) = if (url == "/studio"
1654 || url == "/studio/")
1655 && method == Method::Get
1656 {
1657 if !is_dev && !auth_ctx.is_admin {
1658 let body = json_error(
1659 "AUTH_REQUIRED",
1660 "/studio requires admin auth in production (set PYLON_ADMIN_TOKEN and pass it as Bearer)",
1661 );
1662 let response = with_security_headers(
1663 Response::from_string(&body)
1664 .with_status_code(401u16)
1665 .with_header(
1666 Header::from_bytes("Content-Type", "application/json").unwrap(),
1667 )
1668 .with_header(
1669 Header::from_bytes(
1670 "Access-Control-Allow-Origin",
1671 cors_origin.as_bytes().to_vec(),
1672 )
1673 .unwrap(),
1674 ),
1675 );
1676 let _ = request.respond(response);
1677 mt.record_request("GET", 401);
1678 continue;
1679 }
1680 let host = request
1687 .headers()
1688 .iter()
1689 .find(|h| h.field.equiv("Host"))
1690 .map(|h| h.value.as_str().to_string())
1691 .unwrap_or_else(|| format!("localhost:{port}"));
1692 let scheme = request
1693 .headers()
1694 .iter()
1695 .find(|h| h.field.equiv("X-Forwarded-Proto"))
1696 .map(|h| h.value.as_str().to_string())
1697 .unwrap_or_else(|| "http".to_string());
1698 let base = format!("{scheme}://{host}");
1699 let html = pylon_studio_api::generate_studio_html(rt.manifest(), &base);
1700 (200u16, html, "text/html", true)
1701 } else {
1702 let meta = pylon_plugin::RequestMeta {
1706 peer_ip: peer_ip.as_str(),
1707 };
1708 if let Err(e) = pr.run_on_request_with_meta(method.as_str(), &url, &auth_ctx, &meta) {
1709 (
1710 e.status,
1711 json_error(&e.code, &e.message),
1712 "application/json",
1713 false,
1714 )
1715 } else if let Some((s, b)) =
1716 pr.try_handle_route(method.as_str(), &url, &body, &auth_ctx)
1717 {
1718 (s, b, "application/json", false)
1720 } else {
1721 let notifier = WsSseNotifier {
1722 ws: Arc::clone(&wh),
1723 sse: Arc::clone(&sh),
1724 };
1725 let openapi_gen = RuntimeOpenApiGenerator {
1726 manifest: rt.manifest(),
1727 };
1728 let file_ops = LocalFileOps::new_default();
1729 let cache_adapter = CacheAdapter(Arc::clone(&ca));
1730 let pubsub_adapter = PubSubAdapter(Arc::clone(&ps));
1731 let email_adapter = EmailAdapter::from_env();
1732 let fn_ops: Option<&dyn pylon_router::FnOps> =
1733 fn_ops_ref.as_deref().map(|f| f as &dyn pylon_router::FnOps);
1734 let shard_adapter = shards_ref.as_ref().map(|reg| ShardOpsAdapter {
1735 registry: Arc::clone(reg),
1736 });
1737 let shard_ops: Option<&dyn pylon_router::ShardOps> = shard_adapter
1738 .as_ref()
1739 .map(|a| a as &dyn pylon_router::ShardOps);
1740 let plugin_hooks = PluginHooksAdapter(Arc::clone(&pr));
1741 let request_headers: Vec<(String, String)> = request
1746 .headers()
1747 .iter()
1748 .map(|h| (h.field.as_str().to_string(), h.value.as_str().to_string()))
1749 .collect();
1750 let router_ctx = pylon_router::RouterContext {
1751 store: rt.as_ref(),
1752 session_store: &ss,
1753 magic_codes: &mc,
1754 oauth_state: &os,
1755 policy_engine: &pe,
1756 change_log: &cl,
1757 notifier: ¬ifier,
1758 rooms: rm.as_ref(),
1759 cache: &cache_adapter,
1760 pubsub: &pubsub_adapter,
1761 jobs: jq.as_ref(),
1762 scheduler: sc.as_ref(),
1763 workflows: we.as_ref(),
1764 files: &file_ops,
1765 openapi: &openapi_gen,
1766 functions: fn_ops,
1767 email: &email_adapter,
1768 shards: shard_ops,
1769 plugin_hooks: &plugin_hooks,
1770 auth_ctx: &auth_ctx,
1771 is_dev,
1772 request_headers: &request_headers,
1773 };
1774 let http_method = HttpMethod::from_str(method.as_str());
1775 let (s, b, _ct) = pylon_router::route(
1776 &router_ctx,
1777 http_method,
1778 &url,
1779 &body,
1780 auth_token.as_deref(),
1781 );
1782 (s, b, "application/json", false)
1783 }
1784 };
1785
1786 let mut response = Response::from_string(&response_body)
1787 .with_status_code(status)
1788 .with_header(Header::from_bytes("Content-Type", content_type).unwrap())
1789 .with_header(
1790 Header::from_bytes(
1791 "Access-Control-Allow-Origin",
1792 cors_origin.as_bytes().to_vec(),
1793 )
1794 .unwrap(),
1795 )
1796 .with_header(
1797 Header::from_bytes(
1798 "Access-Control-Allow-Methods",
1799 "GET, POST, PATCH, DELETE, OPTIONS",
1800 )
1801 .unwrap(),
1802 )
1803 .with_header(
1804 Header::from_bytes(
1805 "Access-Control-Allow-Headers",
1806 "Content-Type, Authorization",
1807 )
1808 .unwrap(),
1809 );
1810
1811 if is_studio {
1824 response = response.with_header(
1825 Header::from_bytes(
1826 "Content-Security-Policy",
1827 "default-src 'self' 'unsafe-inline' 'unsafe-eval' https://cdn.tailwindcss.com https://unpkg.com ws: wss:",
1828 ).unwrap(),
1829 );
1830 }
1831
1832 let response = with_security_headers(response);
1833
1834 let _ = request.respond(response);
1835 mt.record_request(method.as_str(), status);
1836 }
1837
1838 tracing::warn!("Shutting down gracefully...");
1839
1840 let drain_timeout = std::time::Duration::from_secs(
1843 std::env::var("PYLON_DRAIN_SECS")
1844 .ok()
1845 .and_then(|s| s.parse().ok())
1846 .unwrap_or(10),
1847 );
1848 let start = Instant::now();
1849
1850 if let Some(reg) = &shard_registry {
1852 for id in reg.ids() {
1853 if let Some(shard) = reg.get(&id) {
1854 shard.stop();
1855 }
1856 }
1857 }
1858
1859 let _ = &scheduler; while start.elapsed() < drain_timeout {
1864 let pending_jobs = job_queue.stats().pending;
1865 if pending_jobs == 0 {
1866 break;
1867 }
1868 std::thread::sleep(std::time::Duration::from_millis(100));
1869 }
1870
1871 let elapsed = start.elapsed();
1872 tracing::warn!(
1873 "Drain complete in {:.1}s (timeout {}s)",
1874 elapsed.as_secs_f32(),
1875 drain_timeout.as_secs()
1876 );
1877 Ok(())
1878}
1879
1880fn json_error(code: &str, message: &str) -> String {
1885 pylon_router::json_error(code, message)
1886}
1887
1888fn build_session_store(app_db_path: Option<&str>) -> SessionStore {
1898 if std::env::var("PYLON_SESSION_IN_MEMORY")
1899 .map(|v| v == "1" || v == "true")
1900 .unwrap_or(false)
1901 {
1902 return SessionStore::new();
1903 }
1904 let explicit = std::env::var("PYLON_SESSION_DB").ok();
1905 let default_path = app_db_path.map(|p| format!("{p}.sessions.db"));
1906 let path = match explicit.or(default_path) {
1907 Some(p) => p,
1908 None => return SessionStore::new(),
1909 };
1910 match crate::session_backend::SqliteSessionBackend::open(&path) {
1911 Ok(backend) => {
1912 tracing::info!("[pylon] Session persistence enabled: {path}");
1913 SessionStore::with_backend(Box::new(backend))
1914 }
1915 Err(e) => {
1916 tracing::warn!(
1917 "[pylon] could not open session DB {path}: {e}. Falling back to in-memory sessions."
1918 );
1919 SessionStore::new()
1920 }
1921 }
1922}
1923
1924fn parse_multipart_first_file(
1935 body: &[u8],
1936 content_type_header: &str,
1937) -> Option<(String, String, Vec<u8>)> {
1938 let boundary_param = content_type_header
1940 .split(';')
1941 .find_map(|p| p.trim().strip_prefix("boundary="))?;
1942 let boundary = boundary_param.trim_matches('"');
1943 let delimiter = format!("--{boundary}");
1944 let delimiter_bytes = delimiter.as_bytes();
1945
1946 let mut pos = 0usize;
1948 while pos < body.len() {
1949 let next = find_subslice(&body[pos..], delimiter_bytes)?;
1951 let part_start = pos + next + delimiter_bytes.len();
1952 if part_start + 2 > body.len() {
1954 return None;
1955 }
1956 if &body[part_start..part_start + 2] == b"--" {
1957 return None; }
1959 let header_start = part_start + skip_crlf(&body[part_start..]);
1960
1961 let header_end_offset = find_subslice(&body[header_start..], b"\r\n\r\n")?;
1963 let headers = &body[header_start..header_start + header_end_offset];
1964 let data_start = header_start + header_end_offset + 4;
1965
1966 let next_delim_offset = find_subslice(&body[data_start..], delimiter_bytes)?;
1968 let mut data_end = data_start + next_delim_offset;
1970 if data_end >= 2 && &body[data_end - 2..data_end] == b"\r\n" {
1971 data_end -= 2;
1972 }
1973
1974 let headers_str = std::str::from_utf8(headers).ok()?;
1976 let mut filename: Option<String> = None;
1977 let mut part_ct = String::from("application/octet-stream");
1978 let mut has_file = false;
1979 for line in headers_str.split("\r\n") {
1980 let lower = line.to_ascii_lowercase();
1981 if let Some(rest) = lower.strip_prefix("content-disposition:") {
1982 if rest.contains("filename=") {
1983 has_file = true;
1984 if let Some(start) = line.find("filename=\"") {
1986 let from = start + 10;
1987 if let Some(end_offset) = line[from..].find('"') {
1988 filename = Some(line[from..from + end_offset].to_string());
1989 }
1990 }
1991 }
1992 } else if let Some(rest) = lower.strip_prefix("content-type:") {
1993 part_ct = rest.trim().to_string();
1994 }
1995 }
1996
1997 if has_file {
1998 let name = filename.unwrap_or_else(|| "upload".into());
1999 return Some((name, part_ct, body[data_start..data_end].to_vec()));
2000 }
2001
2002 pos = data_end;
2003 }
2004 None
2005}
2006
2007fn find_subslice(haystack: &[u8], needle: &[u8]) -> Option<usize> {
2008 if needle.is_empty() || needle.len() > haystack.len() {
2009 return None;
2010 }
2011 haystack.windows(needle.len()).position(|w| w == needle)
2012}
2013
2014fn skip_crlf(buf: &[u8]) -> usize {
2015 if buf.len() >= 2 && &buf[0..2] == b"\r\n" {
2016 2
2017 } else if !buf.is_empty() && buf[0] == b'\n' {
2018 1
2019 } else {
2020 0
2021 }
2022}
2023
2024#[cfg(test)]
2025mod multipart_tests {
2026 use super::*;
2027
2028 #[test]
2029 fn parses_single_file() {
2030 let body = b"--bnd\r\n\
2031Content-Disposition: form-data; name=\"file\"; filename=\"hello.txt\"\r\n\
2032Content-Type: text/plain\r\n\
2033\r\n\
2034Hello world\r\n\
2035--bnd--\r\n";
2036 let ct = "multipart/form-data; boundary=bnd";
2037 let (name, content_type, bytes) = parse_multipart_first_file(body, ct).unwrap();
2038 assert_eq!(name, "hello.txt");
2039 assert_eq!(content_type, "text/plain");
2040 assert_eq!(bytes, b"Hello world");
2041 }
2042
2043 #[test]
2044 fn returns_none_without_file_part() {
2045 let body = b"--bnd\r\n\
2046Content-Disposition: form-data; name=\"field\"\r\n\
2047\r\n\
2048just text\r\n\
2049--bnd--\r\n";
2050 let ct = "multipart/form-data; boundary=bnd";
2051 assert!(parse_multipart_first_file(body, ct).is_none());
2052 }
2053
2054 #[test]
2055 fn returns_none_when_no_boundary() {
2056 assert!(parse_multipart_first_file(b"anything", "application/json").is_none());
2057 }
2058}