1#[allow(unused_imports)]
2use std::io::Read;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5use std::time::Instant;
6
7use pylon_auth::SessionStore;
8use pylon_http::HttpMethod;
9use pylon_plugin::PluginRegistry;
10use pylon_policy::PolicyEngine;
11use pylon_sync::{ChangeKind, ChangeLog};
12use tiny_http::{Header, Method, Response, Server};
13
14use crate::datastore::{
15 CacheAdapter, EmailAdapter, LocalFileOps, PluginHooksAdapter, PubSubAdapter,
16 RuntimeOpenApiGenerator, ShardOpsAdapter, WsSseNotifier,
17};
18use crate::jobs::{JobQueue, JobResult, Worker};
19use crate::metrics::Metrics;
20use crate::pubsub::PubSubBroker;
21use crate::rate_limit::RateLimiter;
22use crate::rooms::RoomManager;
23use crate::scheduler::Scheduler;
24use crate::sse::SseHub;
25use crate::workflows::WorkflowEngine;
26use crate::ws::WsHub;
27use crate::Runtime;
28use pylon_plugin::builtin::ai_proxy::{AiMessage, AiProxyPlugin};
29use pylon_plugin::builtin::cache::CachePlugin;
30
31struct StreamingBody {
41 rx: std::sync::mpsc::Receiver<Vec<u8>>,
42 buf: Vec<u8>,
43 pos: usize,
44}
45
46impl StreamingBody {
47 fn new(rx: std::sync::mpsc::Receiver<Vec<u8>>) -> Self {
48 Self {
49 rx,
50 buf: Vec::new(),
51 pos: 0,
52 }
53 }
54}
55
56impl std::io::Read for StreamingBody {
57 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
58 if self.pos < self.buf.len() {
61 let remaining = &self.buf[self.pos..];
62 let n = remaining.len().min(buf.len());
63 buf[..n].copy_from_slice(&remaining[..n]);
64 self.pos += n;
65 if self.pos >= self.buf.len() {
66 self.buf.clear();
67 self.pos = 0;
68 }
69 return Ok(n);
70 }
71
72 match self.rx.recv() {
74 Ok(data) if data.is_empty() => Ok(0),
75 Ok(data) => {
76 let n = data.len().min(buf.len());
77 buf[..n].copy_from_slice(&data[..n]);
78 if n < data.len() {
79 self.buf = data;
80 self.pos = n;
81 }
82 Ok(n)
83 }
84 Err(_) => Ok(0), }
86 }
87}
88
89static SHUTDOWN: AtomicBool = AtomicBool::new(false);
91
92pub fn request_shutdown() {
98 SHUTDOWN.store(true, Ordering::SeqCst);
99 if let Some(srv) = SERVER_HANDLE.get() {
102 srv.unblock();
103 }
104}
105
106static SERVER_HANDLE: std::sync::OnceLock<Arc<Server>> = std::sync::OnceLock::new();
109
110fn security_headers() -> Vec<Header> {
116 vec![
117 Header::from_bytes("X-Content-Type-Options", "nosniff").unwrap(),
118 Header::from_bytes("X-Frame-Options", "DENY").unwrap(),
119 Header::from_bytes("X-XSS-Protection", "1; mode=block").unwrap(),
120 ]
121}
122
123fn with_security_headers<R: std::io::Read>(response: Response<R>) -> Response<R> {
125 let mut resp = response;
126 for header in security_headers() {
127 resp = resp.with_header(header);
128 }
129 resp
130}
131
132pub fn start(runtime: Arc<Runtime>, port: u16) -> Result<(), String> {
134 start_with_plugins(runtime, port, None)
135}
136
137pub fn start_with_plugins(
139 runtime: Arc<Runtime>,
140 port: u16,
141 plugins: Option<Arc<PluginRegistry>>,
142) -> Result<(), String> {
143 start_server(runtime, port, plugins, None)
144}
145
146pub fn start_with_shards(
149 runtime: Arc<Runtime>,
150 port: u16,
151 plugins: Option<Arc<PluginRegistry>>,
152 shard_registry: Arc<dyn pylon_realtime::DynShardRegistry>,
153) -> Result<(), String> {
154 start_server(runtime, port, plugins, Some(shard_registry))
155}
156
157fn start_server(
158 runtime: Arc<Runtime>,
159 port: u16,
160 plugins: Option<Arc<PluginRegistry>>,
161 shard_registry: Option<Arc<dyn pylon_realtime::DynShardRegistry>>,
162) -> Result<(), String> {
163 pylon_observability::run_tracing_hook();
168
169 let addr = format!("0.0.0.0:{port}");
170 let server = Server::http(&addr).map_err(|e| format!("Failed to start server: {e}"))?;
171 let server = Arc::new(server);
172
173 let _ = SERVER_HANDLE.set(Arc::clone(&server));
175
176 let session_store = Arc::new(build_session_store(runtime.db_path().as_deref()));
177 let magic_codes = Arc::new(pylon_auth::MagicCodeStore::new());
178 let oauth_state = Arc::new(match std::env::var("PYLON_SESSION_DB").ok() {
182 Some(path) => match crate::oauth_backend::SqliteOAuthBackend::open(&path) {
183 Ok(backend) => pylon_auth::OAuthStateStore::with_backend(Box::new(backend)),
184 Err(e) => {
185 tracing::warn!(
186 "[auth] OAuth state SQLite backend unavailable: {e} — falling back to in-memory"
187 );
188 pylon_auth::OAuthStateStore::new()
189 }
190 },
191 None => pylon_auth::OAuthStateStore::new(),
192 });
193 let policy_engine = Arc::new(PolicyEngine::from_manifest(runtime.manifest()));
194 let change_log = Arc::new(ChangeLog::new());
195
196 for entity in runtime.manifest().entities.iter() {
204 match runtime.list(&entity.name) {
205 Ok(rows) => {
206 for row in rows {
207 if let Some(id) = row.get("id").and_then(|v| v.as_str()) {
208 change_log.append(&entity.name, id, ChangeKind::Insert, Some(row.clone()));
209 }
210 }
211 }
212 Err(_) => {
213 }
215 }
216 }
217 let ws_hub = WsHub::new();
218 let sse_hub = SseHub::new();
219 let is_dev_early = std::env::var("PYLON_DEV_MODE")
232 .map(|v| v == "1" || v == "true")
233 .unwrap_or(true);
234 let plugin_rl_max: u32 = if is_dev_early { 100_000 } else { 100 };
235 let plugin_reg: Arc<PluginRegistry> = plugins.unwrap_or_else(|| {
236 let mut reg = PluginRegistry::new(runtime.manifest().clone());
237 reg.register(Arc::new(
238 pylon_plugin::builtin::rate_limit::RateLimitPlugin::new(
239 plugin_rl_max,
240 std::time::Duration::from_secs(60),
241 ),
242 ));
243 reg.register(Arc::new(
248 pylon_plugin::builtin::tenant_scope::TenantScopePlugin::from_manifest(
249 runtime.manifest(),
250 ),
251 ));
252 Arc::new(reg)
253 });
254 let room_mgr = Arc::new(RoomManager::new(120)); let ws_port = port + 1;
256 let sse_port = port + 2;
257
258 let start_time = Instant::now();
260
261 let metrics = Arc::new(Metrics::new());
262
263 let cache = Arc::new(CachePlugin::new(100_000));
265 let pubsub_broker = Arc::new(PubSubBroker::new(100));
266
267 let job_queue = Arc::new(JobQueue::new(1000));
269
270 let jobs_in_memory = std::env::var("PYLON_JOBS_IN_MEMORY")
276 .map(|v| v == "1" || v == "true")
277 .unwrap_or(false);
278 if !jobs_in_memory {
279 let jobs_db_path = std::env::var("PYLON_JOBS_DB").ok().unwrap_or_else(|| {
280 runtime
281 .db_path()
282 .map(|p| format!("{p}.jobs.db"))
283 .unwrap_or_else(|| "pylon.jobs.db".into())
284 });
285 match crate::job_store::JobStore::open(&jobs_db_path) {
286 Ok(store) => {
287 let store = Arc::new(store);
288 let restored = job_queue.restore_from(&store);
289 if restored > 0 {
290 tracing::info!("[jobs] Restored {restored} pending job(s) from {jobs_db_path}");
291 }
292 job_queue.attach_store(store);
293 }
294 Err(e) => {
295 tracing::warn!(
296 "[jobs] Could not open job store at {jobs_db_path}: {e} — running without persistence"
297 );
298 }
299 }
300 }
301
302 {
304 let cache_ref = Arc::clone(&cache);
305 job_queue.register(
306 "pylon.cache.cleanup",
307 Arc::new(move |_job| {
308 cache_ref.cleanup_expired();
309 JobResult::Success
310 }),
311 );
312 let rooms_ref = Arc::clone(&room_mgr);
313 job_queue.register(
314 "pylon.rooms.cleanup",
315 Arc::new(move |_job| {
316 rooms_ref.cleanup_idle();
317 JobResult::Success
318 }),
319 );
320 }
321
322 let scheduler = Arc::new(Scheduler::new(Arc::clone(&job_queue)));
323 let _ = scheduler.schedule(
325 "pylon.cache.cleanup",
326 "*/10 * * * *",
327 Arc::new(|_| JobResult::Success),
328 );
329 let _ = scheduler.schedule(
330 "pylon.rooms.cleanup",
331 "*/5 * * * *",
332 Arc::new(|_| JobResult::Success),
333 );
334
335 let _worker_handles: Vec<_> = (0..2)
337 .map(|i| {
338 let w = Worker::new(Arc::clone(&job_queue), &format!("worker-{i}"));
339 w.start()
340 })
341 .collect();
342
343 let _scheduler_handle = Arc::clone(&scheduler).start();
345
346 let wf_runner_url = std::env::var("PYLON_WORKFLOW_RUNNER_URL")
348 .unwrap_or_else(|_| "http://127.0.0.1:9876/run".to_string());
349 let workflow_engine = Arc::new(WorkflowEngine::new(&wf_runner_url, 10_000));
350
351 let default_rl_max = if is_dev_early { 100_000 } else { 600 };
362 let rl_max: u32 = std::env::var("PYLON_RATE_LIMIT_MAX")
363 .ok()
364 .and_then(|v| v.parse().ok())
365 .unwrap_or(default_rl_max);
366 let rl_window: u64 = std::env::var("PYLON_RATE_LIMIT_WINDOW")
367 .ok()
368 .and_then(|v| v.parse().ok())
369 .unwrap_or(60);
370 let rate_limiter = Arc::new(RateLimiter::new(rl_max, rl_window));
371
372 let fn_rl_max: u32 = std::env::var("PYLON_FN_RATE_LIMIT_MAX")
376 .ok()
377 .and_then(|v| v.parse().ok())
378 .unwrap_or(30);
379 let fn_rl_window: u64 = std::env::var("PYLON_FN_RATE_LIMIT_WINDOW")
380 .ok()
381 .and_then(|v| v.parse().ok())
382 .unwrap_or(60);
383 let fn_rate_limiter = Arc::new(RateLimiter::new(fn_rl_max, fn_rl_window));
384
385 let fn_notifier: Arc<dyn pylon_router::ChangeNotifier> =
393 Arc::new(crate::datastore::WsSseNotifier {
394 ws: Arc::clone(&ws_hub),
395 sse: Arc::clone(&sse_hub),
396 });
397 let fn_ops_maybe = crate::datastore::try_spawn_functions(
398 Arc::clone(&runtime),
399 Arc::clone(&job_queue),
400 Arc::clone(&fn_rate_limiter),
401 Arc::clone(&change_log),
402 fn_notifier,
403 );
404
405 let is_dev = std::env::var("PYLON_DEV_MODE")
413 .map(|v| v == "1" || v == "true")
414 .unwrap_or(false);
415
416 let cors_origin = match std::env::var("PYLON_CORS_ORIGIN") {
423 Ok(v) => v,
424 Err(_) if is_dev => "*".to_string(),
425 Err(_) => {
426 return Err(
427 "PYLON_CORS_ORIGIN must be set in production (non-dev mode). \
428 Set it to your frontend's origin, or set PYLON_DEV_MODE=true \
429 for local development."
430 .into(),
431 );
432 }
433 };
434 if !is_dev && cors_origin == "*" {
435 return Err("PYLON_CORS_ORIGIN=\"*\" is refused in production mode. \
436 Set it to an explicit origin (https://app.example.com)."
437 .into());
438 }
439 let allow_credentials = cors_origin != "*";
446 if Header::from_bytes(
450 "Access-Control-Allow-Origin",
451 cors_origin.as_bytes().to_vec(),
452 )
453 .is_err()
454 {
455 return Err(format!(
456 "PYLON_CORS_ORIGIN={cors_origin:?} contains bytes that are not a valid HTTP header value"
457 ));
458 }
459
460 let admin_token: Option<String> = std::env::var("PYLON_ADMIN_TOKEN").ok();
462
463 let cookie_config = Arc::new({
470 let app_name = runtime.manifest().name.as_str();
471 pylon_auth::CookieConfig::from_env(&pylon_auth::CookieConfig::default_name_for(app_name))
472 });
473
474 let csrf_origins: Vec<String> = match std::env::var("PYLON_CSRF_ORIGINS") {
484 Ok(v) => v
485 .split(',')
486 .map(|s| s.trim().to_string())
487 .filter(|s| !s.is_empty())
488 .collect(),
489 Err(_) => {
490 if is_dev {
491 vec!["*".to_string()]
492 } else if cors_origin != "*" {
493 vec![cors_origin.clone()]
494 } else {
495 vec![]
497 }
498 }
499 };
500 let csrf = Arc::new(pylon_plugin::builtin::csrf::CsrfPlugin::new(csrf_origins));
501
502 {
516 let hub = Arc::clone(&ws_hub);
517 let sessions = Arc::clone(&session_store);
518 let runtime_for_fetcher = Arc::clone(&runtime);
519 let pe_for_fetcher = Arc::clone(&policy_engine);
520 let fetcher: crate::ws::SnapshotFetcher = Arc::new(move |auth_ctx, entity, row_id| {
521 use pylon_http::DataStore;
522 let row = match runtime_for_fetcher.get_by_id(entity, row_id) {
527 Ok(Some(v)) => v,
528 _ => return None,
529 };
530 if !matches!(
531 pe_for_fetcher.check_entity_read(entity, auth_ctx, Some(&row)),
532 pylon_policy::PolicyResult::Allowed
533 ) {
534 return None;
535 }
536 let snap = match runtime_for_fetcher.crdt_snapshot(entity, row_id) {
537 Ok(Some(bytes)) => bytes,
538 _ => return None,
539 };
540 pylon_router::encode_crdt_frame(
541 pylon_router::CRDT_FRAME_SNAPSHOT,
542 entity,
543 row_id,
544 &snap,
545 )
546 .ok()
547 });
548 std::thread::spawn(move || {
549 crate::ws::start_ws_server(hub, sessions, ws_port, Some(fetcher));
550 });
551 }
552
553 {
555 let hub = Arc::clone(&sse_hub);
556 std::thread::spawn(move || {
557 crate::sse::start_sse_server(hub, sse_port);
558 });
559 }
560
561 let shard_ws_port = port + 3;
563 if let Some(reg) = shard_registry.clone() {
564 let sessions = Arc::clone(&session_store);
565 std::thread::spawn(move || {
566 crate::shard_ws::start_shard_ws_server(reg, sessions, shard_ws_port);
567 });
568 }
569
570 tracing::warn!("pylon dev server listening on http://localhost:{port}");
571 tracing::info!(" WebSocket: ws://localhost:{ws_port}");
572 tracing::info!(" Studio: http://localhost:{port}/studio");
573 tracing::info!(" API: http://localhost:{port}/api/entities/<entity>");
574 tracing::info!(" Auth: http://localhost:{port}/api/auth/session");
575
576 loop {
580 if SHUTDOWN.load(Ordering::Relaxed) {
581 break;
582 }
583
584 let mut request = match server.recv() {
585 Ok(rq) => rq,
586 Err(_) => {
587 break;
589 }
590 };
591
592 if SHUTDOWN.load(Ordering::Relaxed) {
593 break;
594 }
595
596 let rt = Arc::clone(&runtime);
597 let ss = Arc::clone(&session_store);
598 let pe = Arc::clone(&policy_engine);
599 let cl = Arc::clone(&change_log);
600 let wh = Arc::clone(&ws_hub);
601 let sh = Arc::clone(&sse_hub);
602 let mc = Arc::clone(&magic_codes);
603 let pr = Arc::clone(&plugin_reg);
604 let rm = Arc::clone(&room_mgr);
605 let mt = Arc::clone(&metrics);
606 let os = Arc::clone(&oauth_state);
607 let ca = Arc::clone(&cache);
608 let ps = Arc::clone(&pubsub_broker);
609 let jq = Arc::clone(&job_queue);
610 let sc = Arc::clone(&scheduler);
611 let we = Arc::clone(&workflow_engine);
612 let fn_ops_ref = fn_ops_maybe.clone();
613 let shards_ref = shard_registry.clone();
614 let cors_origin = cors_origin.clone();
615 let cookie_config = Arc::clone(&cookie_config);
616 let allow_credentials = allow_credentials;
617 let is_dev = is_dev;
618
619 let method = request.method().clone();
620 let url = request.url().to_string();
621
622 if url == "/health" && method == Method::Get {
624 let uptime = start_time.elapsed().as_secs();
625 let body = serde_json::json!({
626 "status": "ok",
627 "version": "0.1.0",
628 "uptime_secs": uptime,
629 })
630 .to_string();
631
632 let response = with_security_headers(
633 Response::from_string(&body)
634 .with_status_code(200u16)
635 .with_header(Header::from_bytes("Content-Type", "application/json").unwrap())
636 .with_header(
637 Header::from_bytes(
638 "Access-Control-Allow-Origin",
639 cors_origin.as_bytes().to_vec(),
640 )
641 .unwrap(),
642 ),
643 );
644 let _ = request.respond(response);
645 continue;
646 }
647
648 if url == "/metrics" && method == Method::Get {
653 if !is_dev {
654 let admin_bytes = admin_token.as_deref().unwrap_or("").as_bytes();
655 let auth_ok = !admin_bytes.is_empty()
656 && request.headers().iter().any(|h| {
657 let name = h.field.as_str().as_str();
658 name.eq_ignore_ascii_case("Authorization")
659 && h.value
660 .as_str()
661 .strip_prefix("Bearer ")
662 .map(|t| pylon_auth::constant_time_eq(t.as_bytes(), admin_bytes))
663 .unwrap_or(false)
664 });
665 if !auth_ok {
666 let body = json_error(
667 "UNAUTHORIZED",
668 "/metrics requires admin bearer token in non-dev mode",
669 );
670 let response = with_security_headers(
671 Response::from_string(&body)
672 .with_status_code(401u16)
673 .with_header(
674 Header::from_bytes("Content-Type", "application/json").unwrap(),
675 ),
676 );
677 let _ = request.respond(response);
678 continue;
679 }
680 }
681 let prefers_prometheus = request.headers().iter().any(|h| {
682 (h.field.as_str() == "Accept" || h.field.as_str() == "accept")
683 && (h.value.as_str().contains("text/plain")
684 || h.value.as_str().contains("application/openmetrics-text"))
685 });
686 let (body, content_type) = if prefers_prometheus {
687 (mt.prometheus(), "text/plain; version=0.0.4")
688 } else {
689 (mt.snapshot().to_string(), "application/json")
690 };
691 let response = with_security_headers(
692 Response::from_string(&body)
693 .with_status_code(200u16)
694 .with_header(Header::from_bytes("Content-Type", content_type).unwrap())
695 .with_header(
696 Header::from_bytes(
697 "Access-Control-Allow-Origin",
698 cors_origin.as_bytes().to_vec(),
699 )
700 .unwrap(),
701 ),
702 );
703 let _ = request.respond(response);
704 mt.record_request("GET", 200);
705 continue;
706 }
707
708 let peer_ip = request
710 .remote_addr()
711 .map(|a| a.ip().to_string())
712 .unwrap_or_default();
713
714 let is_preflight = matches!(method, Method::Options);
720 if !is_preflight {
721 if let Err(retry_after) = rate_limiter.check(&peer_ip) {
722 let err_body = json_error(
723 "RATE_LIMITED",
724 &format!("Too many requests. Retry after {retry_after} seconds."),
725 );
726 let response = with_security_headers(
727 Response::from_string(&err_body)
728 .with_status_code(429u16)
729 .with_header(
730 Header::from_bytes("Content-Type", "application/json").unwrap(),
731 )
732 .with_header(
733 Header::from_bytes(
734 "Access-Control-Allow-Origin",
735 cors_origin.as_bytes().to_vec(),
736 )
737 .unwrap(),
738 )
739 .with_header(
740 Header::from_bytes(
741 "Access-Control-Allow-Methods",
742 "GET, POST, PATCH, DELETE, OPTIONS",
743 )
744 .unwrap(),
745 )
746 .with_header(
747 Header::from_bytes(
748 "Access-Control-Allow-Headers",
749 "Content-Type, Authorization",
750 )
751 .unwrap(),
752 )
753 .with_header(
754 Header::from_bytes(
755 "Retry-After",
756 retry_after.to_string().as_bytes().to_vec(),
757 )
758 .unwrap(),
759 ),
760 );
761 let _ = request.respond(response);
762 mt.record_request(method.as_str(), 429);
763 continue;
764 }
765 } {
781 let method_str = method.as_str();
782 let is_bearer = request.headers().iter().any(|h| {
783 (h.field.as_str() == "Authorization" || h.field.as_str() == "authorization")
784 && h.value.as_str().starts_with("Bearer ")
785 });
786 if !is_bearer && !matches!(method, Method::Get | Method::Head | Method::Options) {
791 let origin = request
792 .headers()
793 .iter()
794 .find(|h| h.field.as_str() == "Origin" || h.field.as_str() == "origin")
795 .map(|h| h.value.as_str().to_string());
796 let referer = request
797 .headers()
798 .iter()
799 .find(|h| h.field.as_str() == "Referer" || h.field.as_str() == "referer")
800 .map(|h| h.value.as_str().to_string());
801 if let Err(err) = csrf.check(method_str, origin.as_deref(), referer.as_deref()) {
802 let body = json_error(&err.code, &err.message);
803 let response = with_security_headers(
804 Response::from_string(&body)
805 .with_status_code(err.status)
806 .with_header(
807 Header::from_bytes("Content-Type", "application/json").unwrap(),
808 )
809 .with_header(
810 Header::from_bytes(
811 "Access-Control-Allow-Origin",
812 cors_origin.as_bytes().to_vec(),
813 )
814 .unwrap(),
815 ),
816 );
817 let _ = request.respond(response);
818 mt.record_request(method_str, err.status);
819 continue;
820 }
821 }
822 }
823
824 let bearer_token: Option<String> = request
834 .headers()
835 .iter()
836 .find(|h| h.field.as_str() == "Authorization" || h.field.as_str() == "authorization")
837 .and_then(|h| {
838 let val = h.value.as_str();
839 val.strip_prefix("Bearer ").map(|t| t.to_string())
840 });
841 let cookie_token: Option<String> = if bearer_token.is_some() {
842 None
843 } else {
844 request
845 .headers()
846 .iter()
847 .find(|h| h.field.as_str() == "Cookie" || h.field.as_str() == "cookie")
848 .and_then(|h| {
849 pylon_auth::extract_session_cookie(h.value.as_str(), &cookie_config.name)
850 })
851 };
852 let auth_token: Option<String> = bearer_token.or(cookie_token);
853 let auth_ctx = if admin_token.is_some()
854 && auth_token.is_some()
855 && pylon_auth::constant_time_eq(
856 auth_token.as_deref().unwrap_or("").as_bytes(),
857 admin_token.as_deref().unwrap_or("").as_bytes(),
858 ) {
859 pylon_auth::AuthContext::admin()
860 } else {
861 ss.resolve(auth_token.as_deref())
862 };
863
864 if url == "/api/__test__/reset" && method == Method::Post {
879 let is_loopback = peer_ip == "127.0.0.1"
880 || peer_ip == "::1"
881 || peer_ip.starts_with("127.")
882 || peer_ip == "localhost";
883 if !is_dev || !rt.is_in_memory() || !is_loopback {
884 let body = json_error(
885 "RESET_REFUSED",
886 "reset endpoint is only available in dev mode + in-memory DB + from loopback",
887 );
888 let response = with_security_headers(
889 Response::from_string(&body)
890 .with_status_code(403u16)
891 .with_header(
892 Header::from_bytes("Content-Type", "application/json").unwrap(),
893 )
894 .with_header(
895 Header::from_bytes(
896 "Access-Control-Allow-Origin",
897 cors_origin.as_bytes().to_vec(),
898 )
899 .unwrap(),
900 ),
901 );
902 let _ = request.respond(response);
903 mt.record_request("POST", 403);
904 continue;
905 }
906 let (status, body) = match rt.reset_for_tests() {
907 Ok(()) => (200u16, "{\"reset\":true}".to_string()),
908 Err(e) => (500u16, json_error(&e.code, &e.message)),
909 };
910 let response = with_security_headers(
911 Response::from_string(&body)
912 .with_status_code(status)
913 .with_header(Header::from_bytes("Content-Type", "application/json").unwrap())
914 .with_header(
915 Header::from_bytes(
916 "Access-Control-Allow-Origin",
917 cors_origin.as_bytes().to_vec(),
918 )
919 .unwrap(),
920 ),
921 );
922 let _ = request.respond(response);
923 mt.record_request("POST", status);
924 continue;
925 }
926
927 if url == "/api/files/upload" && method == Method::Post {
936 const UPLOAD_MAX: usize = 10 * 1024 * 1024;
937 if let Some(declared) = request.body_length() {
940 if declared > UPLOAD_MAX {
941 let err = json_error(
942 "PAYLOAD_TOO_LARGE",
943 &format!("Content-Length {declared} exceeds upload max of {UPLOAD_MAX}"),
944 );
945 let response = with_security_headers(
946 Response::from_string(&err)
947 .with_status_code(413u16)
948 .with_header(
949 Header::from_bytes("Content-Type", "application/json").unwrap(),
950 )
951 .with_header(
952 Header::from_bytes(
953 "Access-Control-Allow-Origin",
954 cors_origin.as_bytes().to_vec(),
955 )
956 .unwrap(),
957 ),
958 );
959 let _ = request.respond(response);
960 mt.record_request("POST", 413);
961 continue;
962 }
963 }
964 if auth_ctx.user_id.is_none() {
965 let err = json_error(
966 "AUTH_REQUIRED",
967 "/api/files/upload requires an authenticated session",
968 );
969 let response = with_security_headers(
970 Response::from_string(&err)
971 .with_status_code(401u16)
972 .with_header(
973 Header::from_bytes("Content-Type", "application/json").unwrap(),
974 )
975 .with_header(
976 Header::from_bytes(
977 "Access-Control-Allow-Origin",
978 cors_origin.as_bytes().to_vec(),
979 )
980 .unwrap(),
981 ),
982 );
983 let _ = request.respond(response);
984 mt.record_request("POST", 401);
985 continue;
986 }
987 use std::io::Read;
992 let mut bytes: Vec<u8> = Vec::with_capacity(8192);
993 let mut limited = request.as_reader().take((UPLOAD_MAX as u64) + 1);
994 let _ = limited.read_to_end(&mut bytes);
995
996 const MAX: usize = UPLOAD_MAX;
997 if bytes.len() > MAX {
998 let err = json_error("PAYLOAD_TOO_LARGE", "File exceeds 10 MB limit");
999 let response = with_security_headers(
1000 Response::from_string(&err)
1001 .with_status_code(413u16)
1002 .with_header(
1003 Header::from_bytes("Content-Type", "application/json").unwrap(),
1004 )
1005 .with_header(
1006 Header::from_bytes(
1007 "Access-Control-Allow-Origin",
1008 cors_origin.as_bytes().to_vec(),
1009 )
1010 .unwrap(),
1011 ),
1012 );
1013 let _ = request.respond(response);
1014 mt.record_request("POST", 413);
1015 continue;
1016 }
1017
1018 let content_type = request
1020 .headers()
1021 .iter()
1022 .find(|h| h.field.as_str() == "Content-Type" || h.field.as_str() == "content-type")
1023 .map(|h| h.value.as_str().to_string())
1024 .unwrap_or_else(|| "application/octet-stream".into());
1025 let filename = request
1026 .headers()
1027 .iter()
1028 .find(|h| h.field.as_str() == "X-Filename" || h.field.as_str() == "x-filename")
1029 .map(|h| h.value.as_str().to_string())
1030 .unwrap_or_else(|| "upload".into());
1031
1032 let (name, ct, payload) = if content_type.starts_with("multipart/form-data") {
1034 match parse_multipart_first_file(&bytes, &content_type) {
1035 Some(p) => p,
1036 None => {
1037 let err = json_error("INVALID_MULTIPART", "Could not parse multipart body");
1038 let response = with_security_headers(
1039 Response::from_string(&err)
1040 .with_status_code(400u16)
1041 .with_header(
1042 Header::from_bytes("Content-Type", "application/json").unwrap(),
1043 )
1044 .with_header(
1045 Header::from_bytes(
1046 "Access-Control-Allow-Origin",
1047 cors_origin.as_bytes().to_vec(),
1048 )
1049 .unwrap(),
1050 ),
1051 );
1052 let _ = request.respond(response);
1053 mt.record_request("POST", 400);
1054 continue;
1055 }
1056 }
1057 } else {
1058 (filename, content_type, bytes)
1059 };
1060
1061 let storage = pylon_storage::files::LocalFileStorage::new(
1062 &std::env::var("PYLON_FILES_DIR").unwrap_or_else(|_| "uploads".into()),
1063 &std::env::var("PYLON_FILES_URL_PREFIX").unwrap_or_else(|_| "/api/files".into()),
1064 );
1065
1066 let (status, body) =
1067 match pylon_storage::files::FileStorage::store(&storage, &name, &payload, &ct) {
1068 Ok(stored) => (
1069 201u16,
1070 serde_json::to_string(&stored).unwrap_or_else(|_| "{}".into()),
1071 ),
1072 Err(e) => (500u16, json_error(&e.code, &e.message)),
1073 };
1074
1075 let response = with_security_headers(
1076 Response::from_string(&body)
1077 .with_status_code(status)
1078 .with_header(Header::from_bytes("Content-Type", "application/json").unwrap())
1079 .with_header(
1080 Header::from_bytes(
1081 "Access-Control-Allow-Origin",
1082 cors_origin.as_bytes().to_vec(),
1083 )
1084 .unwrap(),
1085 ),
1086 );
1087 let _ = request.respond(response);
1088 mt.record_request("POST", status);
1089 continue;
1090 }
1091
1092 const MAX_BODY_SIZE: usize = 10 * 1024 * 1024;
1102
1103 if let Some(declared) = request.body_length() {
1104 if declared > MAX_BODY_SIZE {
1105 let err_body = json_error(
1106 "PAYLOAD_TOO_LARGE",
1107 &format!("Content-Length {declared} exceeds max of {MAX_BODY_SIZE}"),
1108 );
1109 let response = with_security_headers(
1110 Response::from_string(&err_body)
1111 .with_status_code(413u16)
1112 .with_header(
1113 Header::from_bytes(
1114 "Access-Control-Allow-Origin",
1115 cors_origin.as_bytes().to_vec(),
1116 )
1117 .unwrap(),
1118 ),
1119 );
1120 let _ = request.respond(response);
1121 mt.record_request(method.as_str(), 413);
1122 continue;
1123 }
1124 }
1125
1126 let mut body = String::new();
1127 if !matches!(
1128 method,
1129 Method::Get | Method::Head | Method::Options | Method::Delete
1130 ) {
1131 use std::io::Read;
1132 let mut limited = request.as_reader().take((MAX_BODY_SIZE as u64) + 1);
1133 let _ = limited.read_to_string(&mut body);
1134 }
1135
1136 if body.len() > MAX_BODY_SIZE {
1137 let err_body = json_error(
1138 "PAYLOAD_TOO_LARGE",
1139 &format!(
1140 "Request body exceeds maximum size of {} bytes",
1141 MAX_BODY_SIZE,
1142 ),
1143 );
1144 let response = with_security_headers(
1145 Response::from_string(&err_body)
1146 .with_status_code(413u16)
1147 .with_header(Header::from_bytes("Content-Type", "application/json").unwrap())
1148 .with_header(
1149 Header::from_bytes(
1150 "Access-Control-Allow-Origin",
1151 cors_origin.as_bytes().to_vec(),
1152 )
1153 .unwrap(),
1154 ),
1155 );
1156 let _ = request.respond(response);
1157 mt.record_request(method.as_str(), 413);
1158 continue;
1159 }
1160
1161 if method == Method::Get {
1165 if let Some(rest) = url.strip_prefix("/api/shards/") {
1166 let rest = rest.split('?').next().unwrap_or(rest);
1167 if let Some(shard_id) = rest.strip_suffix("/connect") {
1168 if auth_ctx.user_id.is_none() {
1173 let err = json_error(
1174 "AUTH_REQUIRED",
1175 "Shard connect requires an authenticated session",
1176 );
1177 let response = with_security_headers(
1178 Response::from_string(&err)
1179 .with_status_code(401u16)
1180 .with_header(
1181 Header::from_bytes("Content-Type", "application/json").unwrap(),
1182 )
1183 .with_header(
1184 Header::from_bytes(
1185 "Access-Control-Allow-Origin",
1186 cors_origin.as_bytes().to_vec(),
1187 )
1188 .unwrap(),
1189 ),
1190 );
1191 let _ = request.respond(response);
1192 mt.record_request("GET", 401);
1193 continue;
1194 }
1195 let shards = match &shards_ref {
1196 Some(s) => Arc::clone(s),
1197 None => {
1198 let err = json_error(
1199 "SHARDS_NOT_AVAILABLE",
1200 "Shard system is not configured",
1201 );
1202 let response = with_security_headers(
1203 Response::from_string(&err)
1204 .with_status_code(503u16)
1205 .with_header(
1206 Header::from_bytes("Content-Type", "application/json")
1207 .unwrap(),
1208 )
1209 .with_header(
1210 Header::from_bytes(
1211 "Access-Control-Allow-Origin",
1212 cors_origin.as_bytes().to_vec(),
1213 )
1214 .unwrap(),
1215 ),
1216 );
1217 let _ = request.respond(response);
1218 mt.record_request("GET", 503);
1219 continue;
1220 }
1221 };
1222 let shard = match shards.get(shard_id) {
1223 Some(s) => s,
1224 None => {
1225 let err = json_error(
1226 "SHARD_NOT_FOUND",
1227 &format!("Shard \"{shard_id}\" not found"),
1228 );
1229 let response = with_security_headers(
1230 Response::from_string(&err)
1231 .with_status_code(404u16)
1232 .with_header(
1233 Header::from_bytes("Content-Type", "application/json")
1234 .unwrap(),
1235 )
1236 .with_header(
1237 Header::from_bytes(
1238 "Access-Control-Allow-Origin",
1239 cors_origin.as_bytes().to_vec(),
1240 )
1241 .unwrap(),
1242 ),
1243 );
1244 let _ = request.respond(response);
1245 mt.record_request("GET", 404);
1246 continue;
1247 }
1248 };
1249
1250 let sub_id = url
1253 .split("sid=")
1254 .nth(1)
1255 .and_then(|s| s.split('&').next())
1256 .map(|s| s.to_string())
1257 .or_else(|| auth_ctx.user_id.clone())
1258 .unwrap_or_else(|| {
1259 format!(
1260 "anon_{}",
1261 std::time::SystemTime::now()
1262 .duration_since(std::time::UNIX_EPOCH)
1263 .unwrap_or_default()
1264 .as_nanos()
1265 )
1266 });
1267 let subscriber_id = pylon_realtime::SubscriberId::new(sub_id);
1268
1269 let (tx, rx) = std::sync::mpsc::channel::<Vec<u8>>();
1270 let streaming_body = StreamingBody::new(rx);
1271
1272 let tx_clone = tx.clone();
1273 let sink: pylon_realtime::SnapshotSink =
1274 Box::new(move |tick: u64, bytes: &[u8]| {
1275 let mut frame = format!("id: {tick}\ndata: ").into_bytes();
1278 frame.extend_from_slice(bytes);
1279 frame.extend_from_slice(b"\n\n");
1280 let _ = tx_clone.send(frame);
1281 });
1282
1283 let shard_auth = pylon_realtime::ShardAuth {
1284 user_id: auth_ctx.user_id.clone(),
1285 is_admin: auth_ctx.is_admin,
1286 };
1287 if let Err(e) = shard.add_subscriber(subscriber_id.clone(), sink, &shard_auth) {
1288 let (status, code) = match &e {
1289 pylon_realtime::ShardError::Unauthorized(_) => (403u16, "UNAUTHORIZED"),
1290 _ => (429u16, "SUBSCRIBE_FAILED"),
1291 };
1292 let err = json_error(code, &e.to_string());
1293 let response = with_security_headers(
1294 Response::from_string(&err)
1295 .with_status_code(status)
1296 .with_header(
1297 Header::from_bytes("Content-Type", "application/json").unwrap(),
1298 )
1299 .with_header(
1300 Header::from_bytes(
1301 "Access-Control-Allow-Origin",
1302 cors_origin.as_bytes().to_vec(),
1303 )
1304 .unwrap(),
1305 ),
1306 );
1307 let _ = request.respond(response);
1308 mt.record_request("GET", status);
1309 continue;
1310 }
1311
1312 {
1315 let shard_cleanup = Arc::clone(&shard);
1316 let sub_id_cleanup = subscriber_id.clone();
1317 let tx_liveness = tx.clone();
1318 std::thread::spawn(move || {
1319 loop {
1322 std::thread::sleep(std::time::Duration::from_secs(30));
1323 if tx_liveness.send(b": heartbeat\n\n".to_vec()).is_err() {
1324 shard_cleanup.remove_subscriber(&sub_id_cleanup);
1325 return;
1326 }
1327 if !shard_cleanup.is_running() {
1328 return;
1329 }
1330 }
1331 });
1332 }
1333
1334 let response = with_security_headers(Response::new(
1335 tiny_http::StatusCode(200),
1336 vec![
1337 Header::from_bytes("Content-Type", "text/event-stream").unwrap(),
1338 Header::from_bytes("Cache-Control", "no-cache").unwrap(),
1339 Header::from_bytes("Connection", "keep-alive").unwrap(),
1340 Header::from_bytes(
1341 "Access-Control-Allow-Origin",
1342 cors_origin.as_bytes().to_vec(),
1343 )
1344 .unwrap(),
1345 ],
1346 streaming_body,
1347 None,
1348 None,
1349 ));
1350 let _ = request.respond(response);
1351 mt.record_request("GET", 200);
1352 continue;
1353 }
1354 }
1355 }
1356
1357 if method == Method::Post
1359 && url.starts_with("/api/fn/")
1360 && url != "/api/fn/traces"
1361 && request.headers().iter().any(|h| {
1362 (h.field.as_str() == "Accept" || h.field.as_str() == "accept")
1363 && h.value.as_str().contains("text/event-stream")
1364 })
1365 {
1366 let fn_name = url
1367 .strip_prefix("/api/fn/")
1368 .unwrap_or("")
1369 .split('?')
1370 .next()
1371 .unwrap_or("")
1372 .to_string();
1373
1374 if let Some(fn_ops) = &fn_ops_maybe {
1375 if pylon_router::FnOps::get_fn(fn_ops.as_ref(), &fn_name).is_none() {
1379 let err = json_error(
1380 "FN_NOT_FOUND",
1381 &format!("Function \"{fn_name}\" is not registered"),
1382 );
1383 let response = with_security_headers(
1384 Response::from_string(&err)
1385 .with_status_code(404u16)
1386 .with_header(
1387 Header::from_bytes("Content-Type", "application/json").unwrap(),
1388 )
1389 .with_header(
1390 Header::from_bytes(
1391 "Access-Control-Allow-Origin",
1392 cors_origin.as_bytes().to_vec(),
1393 )
1394 .unwrap(),
1395 ),
1396 );
1397 let _ = request.respond(response);
1398 mt.record_request("POST", 404);
1399 continue;
1400 }
1401 let identity = auth_ctx.user_id.as_deref().unwrap_or("anon");
1403 if let Err(retry_after) =
1404 pylon_router::FnOps::check_rate_limit(fn_ops.as_ref(), &fn_name, identity)
1405 {
1406 let body = format!(
1407 r#"{{"error":{{"code":"RATE_LIMITED","message":"Function \"{fn_name}\" rate limit exceeded","retry_after_secs":{retry_after}}}}}"#
1408 );
1409 let response = with_security_headers(
1410 Response::from_string(&body)
1411 .with_status_code(429u16)
1412 .with_header(
1413 Header::from_bytes("Content-Type", "application/json").unwrap(),
1414 )
1415 .with_header(
1416 Header::from_bytes(
1417 "Access-Control-Allow-Origin",
1418 cors_origin.as_bytes().to_vec(),
1419 )
1420 .unwrap(),
1421 ),
1422 );
1423 let _ = request.respond(response);
1424 mt.record_request("POST", 429);
1425 continue;
1426 }
1427
1428 let args: serde_json::Value =
1429 serde_json::from_str(&body).unwrap_or(serde_json::json!({}));
1430
1431 let auth = pylon_functions::protocol::AuthInfo {
1432 user_id: auth_ctx.user_id.clone(),
1433 is_admin: auth_ctx.is_admin,
1434 tenant_id: auth_ctx.tenant_id.clone(),
1435 };
1436
1437 let (tx, rx) = std::sync::mpsc::channel::<Vec<u8>>();
1438 let streaming_body = StreamingBody::new(rx);
1439
1440 let fn_ops_cl = Arc::clone(fn_ops);
1441 let tx_stream = tx.clone();
1442 std::thread::spawn(move || {
1443 let tx_cb = tx_stream.clone();
1444 let on_stream: Box<dyn FnMut(&str) + Send> = Box::new(move |chunk: &str| {
1445 let sse = format!("data: {}\n\n", chunk);
1446 let _ = tx_cb.send(sse.into_bytes());
1447 });
1448
1449 let result = pylon_router::FnOps::call(
1450 fn_ops_cl.as_ref(),
1451 &fn_name,
1452 args,
1453 auth,
1454 Some(on_stream),
1455 None, );
1457 match result {
1458 Ok((value, _trace)) => {
1459 let done = format!(
1460 "event: result\ndata: {}\n\n",
1461 serde_json::to_string(&value).unwrap_or_else(|_| "null".into())
1462 );
1463 let _ = tx_stream.send(done.into_bytes());
1464 }
1465 Err(e) => {
1466 let err = format!(
1467 "event: error\ndata: {}\n\n",
1468 serde_json::json!({"code": e.code, "message": e.message})
1469 );
1470 let _ = tx_stream.send(err.into_bytes());
1471 }
1472 }
1473 });
1474
1475 let response = with_security_headers(Response::new(
1476 tiny_http::StatusCode(200),
1477 vec![
1478 Header::from_bytes("Content-Type", "text/event-stream").unwrap(),
1479 Header::from_bytes("Cache-Control", "no-cache").unwrap(),
1480 Header::from_bytes("Connection", "keep-alive").unwrap(),
1481 Header::from_bytes(
1482 "Access-Control-Allow-Origin",
1483 cors_origin.as_bytes().to_vec(),
1484 )
1485 .unwrap(),
1486 ],
1487 streaming_body,
1488 None,
1489 None,
1490 ));
1491 let _ = request.respond(response);
1492 mt.record_request("POST", 200);
1493 continue;
1494 }
1495 }
1496
1497 if url == "/api/ai/stream" && method == Method::Post {
1499 if auth_ctx.user_id.is_none() {
1502 let err = json_error(
1503 "AUTH_REQUIRED",
1504 "/api/ai/stream requires an authenticated session",
1505 );
1506 let response = with_security_headers(
1507 Response::from_string(&err)
1508 .with_status_code(401u16)
1509 .with_header(
1510 Header::from_bytes("Content-Type", "application/json").unwrap(),
1511 )
1512 .with_header(
1513 Header::from_bytes(
1514 "Access-Control-Allow-Origin",
1515 cors_origin.as_bytes().to_vec(),
1516 )
1517 .unwrap(),
1518 ),
1519 );
1520 let _ = request.respond(response);
1521 mt.record_request("POST", 401);
1522 continue;
1523 }
1524 let ai_provider = std::env::var("PYLON_AI_PROVIDER").unwrap_or_default();
1525 let ai_key = std::env::var("PYLON_AI_API_KEY").unwrap_or_default();
1526 let ai_model = std::env::var("PYLON_AI_MODEL").unwrap_or_default();
1527 let ai_base = std::env::var("PYLON_AI_BASE_URL").unwrap_or_default();
1528
1529 if ai_key.is_empty() && ai_provider != "custom" {
1530 let err = json_error(
1531 "AI_NOT_CONFIGURED",
1532 "Set PYLON_AI_PROVIDER and PYLON_AI_API_KEY",
1533 );
1534 let response = with_security_headers(
1535 Response::from_string(&err)
1536 .with_status_code(503u16)
1537 .with_header(
1538 Header::from_bytes("Content-Type", "application/json").unwrap(),
1539 )
1540 .with_header(
1541 Header::from_bytes(
1542 "Access-Control-Allow-Origin",
1543 cors_origin.as_bytes().to_vec(),
1544 )
1545 .unwrap(),
1546 ),
1547 );
1548 let _ = request.respond(response);
1549 mt.record_request("POST", 503);
1550 continue;
1551 }
1552
1553 let parsed: serde_json::Value = match serde_json::from_str(&body) {
1554 Ok(v) => v,
1555 Err(_) => {
1556 let err = json_error("INVALID_JSON", "Invalid request body");
1557 let response = with_security_headers(
1558 Response::from_string(&err)
1559 .with_status_code(400u16)
1560 .with_header(
1561 Header::from_bytes("Content-Type", "application/json").unwrap(),
1562 )
1563 .with_header(
1564 Header::from_bytes(
1565 "Access-Control-Allow-Origin",
1566 cors_origin.as_bytes().to_vec(),
1567 )
1568 .unwrap(),
1569 ),
1570 );
1571 let _ = request.respond(response);
1572 mt.record_request("POST", 400);
1573 continue;
1574 }
1575 };
1576
1577 let messages: Vec<AiMessage> = match parsed.get("messages").and_then(|m| m.as_array()) {
1578 Some(arr) => arr
1579 .iter()
1580 .filter_map(|m| {
1581 let role = m.get("role")?.as_str()?.to_string();
1582 let content = m.get("content")?.as_str()?.to_string();
1583 Some(AiMessage { role, content })
1584 })
1585 .collect(),
1586 None => {
1587 let err = json_error("MISSING_FIELD", "\"messages\" array is required");
1588 let response = with_security_headers(
1589 Response::from_string(&err)
1590 .with_status_code(400u16)
1591 .with_header(
1592 Header::from_bytes("Content-Type", "application/json").unwrap(),
1593 )
1594 .with_header(
1595 Header::from_bytes(
1596 "Access-Control-Allow-Origin",
1597 cors_origin.as_bytes().to_vec(),
1598 )
1599 .unwrap(),
1600 ),
1601 );
1602 let _ = request.respond(response);
1603 mt.record_request("POST", 400);
1604 continue;
1605 }
1606 };
1607
1608 let model = parsed
1610 .get("model")
1611 .and_then(|m| m.as_str())
1612 .map(|s| s.to_string())
1613 .unwrap_or(ai_model);
1614
1615 let proxy = match ai_provider.as_str() {
1616 "anthropic" => AiProxyPlugin::anthropic(&ai_key, &model),
1617 "openai" => AiProxyPlugin::openai(&ai_key, &model),
1618 "custom" => AiProxyPlugin::custom_with_model(&ai_base, &ai_key, &model),
1619 _ => AiProxyPlugin::openai(&ai_key, &model),
1620 };
1621
1622 let (tx, rx) = std::sync::mpsc::channel::<Vec<u8>>();
1625 let streaming_body = StreamingBody::new(rx);
1626
1627 std::thread::spawn(move || {
1630 let result = proxy.stream_completion(&messages, &mut |chunk| {
1631 let sse = format!(
1632 "data: {}
1633
1634",
1635 serde_json::json!({
1636 "choices": [{"index": 0, "delta": {"content": chunk}}]
1637 })
1638 );
1639 let _ = tx.send(sse.into_bytes());
1640 });
1641
1642 match result {
1644 Ok(_) => {
1645 let _ = tx.send(
1646 b"data: [DONE]
1647
1648"
1649 .to_vec(),
1650 );
1651 }
1652 Err(e) => {
1653 let err_event = format!(
1654 "data: {}
1655
1656",
1657 serde_json::json!({"error": {"message": e, "type": "stream_error"}})
1658 );
1659 let _ = tx.send(err_event.into_bytes());
1660 }
1661 }
1662 });
1664
1665 let response = with_security_headers(Response::new(
1666 tiny_http::StatusCode(200),
1667 vec![
1668 Header::from_bytes("Content-Type", "text/event-stream").unwrap(),
1669 Header::from_bytes("Cache-Control", "no-cache").unwrap(),
1670 Header::from_bytes("Connection", "keep-alive").unwrap(),
1671 Header::from_bytes(
1672 "Access-Control-Allow-Origin",
1673 cors_origin.as_bytes().to_vec(),
1674 )
1675 .unwrap(),
1676 ],
1677 streaming_body,
1678 None, None,
1680 ));
1681 let _ = request.respond(response);
1682 mt.record_request("POST", 200);
1683 continue;
1684 }
1685
1686 let (status, response_body, content_type, is_studio, extra_headers) = if (url == "/studio"
1697 || url == "/studio/")
1698 && method == Method::Get
1699 {
1700 if !is_dev && !auth_ctx.is_admin {
1701 let body = json_error(
1702 "AUTH_REQUIRED",
1703 "/studio requires admin auth in production (set PYLON_ADMIN_TOKEN and pass it as Bearer)",
1704 );
1705 let response = with_security_headers(
1706 Response::from_string(&body)
1707 .with_status_code(401u16)
1708 .with_header(
1709 Header::from_bytes("Content-Type", "application/json").unwrap(),
1710 )
1711 .with_header(
1712 Header::from_bytes(
1713 "Access-Control-Allow-Origin",
1714 cors_origin.as_bytes().to_vec(),
1715 )
1716 .unwrap(),
1717 ),
1718 );
1719 let _ = request.respond(response);
1720 mt.record_request("GET", 401);
1721 continue;
1722 }
1723 let host = request
1730 .headers()
1731 .iter()
1732 .find(|h| h.field.equiv("Host"))
1733 .map(|h| h.value.as_str().to_string())
1734 .unwrap_or_else(|| format!("localhost:{port}"));
1735 let scheme = request
1736 .headers()
1737 .iter()
1738 .find(|h| h.field.equiv("X-Forwarded-Proto"))
1739 .map(|h| h.value.as_str().to_string())
1740 .unwrap_or_else(|| "http".to_string());
1741 let base = format!("{scheme}://{host}");
1742 let html = pylon_studio_api::generate_studio_html(rt.manifest(), &base);
1743 (
1744 200u16,
1745 html,
1746 "text/html",
1747 true,
1748 Vec::<(String, String)>::new(),
1749 )
1750 } else {
1751 let meta = pylon_plugin::RequestMeta {
1755 peer_ip: peer_ip.as_str(),
1756 };
1757 if let Err(e) = pr.run_on_request_with_meta(method.as_str(), &url, &auth_ctx, &meta) {
1758 (
1759 e.status,
1760 json_error(&e.code, &e.message),
1761 "application/json",
1762 false,
1763 Vec::new(),
1764 )
1765 } else if let Some((s, b)) =
1766 pr.try_handle_route(method.as_str(), &url, &body, &auth_ctx)
1767 {
1768 (s, b, "application/json", false, Vec::new())
1770 } else {
1771 let notifier = WsSseNotifier {
1772 ws: Arc::clone(&wh),
1773 sse: Arc::clone(&sh),
1774 };
1775 let openapi_gen = RuntimeOpenApiGenerator {
1776 manifest: rt.manifest(),
1777 };
1778 let file_ops = LocalFileOps::new_default();
1779 let cache_adapter = CacheAdapter(Arc::clone(&ca));
1780 let pubsub_adapter = PubSubAdapter(Arc::clone(&ps));
1781 let email_adapter = EmailAdapter::from_env();
1782 let fn_ops: Option<&dyn pylon_router::FnOps> =
1783 fn_ops_ref.as_deref().map(|f| f as &dyn pylon_router::FnOps);
1784 let shard_adapter = shards_ref.as_ref().map(|reg| ShardOpsAdapter {
1785 registry: Arc::clone(reg),
1786 });
1787 let shard_ops: Option<&dyn pylon_router::ShardOps> = shard_adapter
1788 .as_ref()
1789 .map(|a| a as &dyn pylon_router::ShardOps);
1790 let plugin_hooks = PluginHooksAdapter(Arc::clone(&pr));
1791 let request_headers: Vec<(String, String)> = request
1796 .headers()
1797 .iter()
1798 .map(|h| (h.field.as_str().to_string(), h.value.as_str().to_string()))
1799 .collect();
1800 let router_ctx = pylon_router::RouterContext {
1801 store: rt.as_ref(),
1802 session_store: &ss,
1803 magic_codes: &mc,
1804 oauth_state: &os,
1805 policy_engine: &pe,
1806 change_log: &cl,
1807 notifier: ¬ifier,
1808 rooms: rm.as_ref(),
1809 cache: &cache_adapter,
1810 pubsub: &pubsub_adapter,
1811 jobs: jq.as_ref(),
1812 scheduler: sc.as_ref(),
1813 workflows: we.as_ref(),
1814 files: &file_ops,
1815 openapi: &openapi_gen,
1816 functions: fn_ops,
1817 email: &email_adapter,
1818 shards: shard_ops,
1819 plugin_hooks: &plugin_hooks,
1820 auth_ctx: &auth_ctx,
1821 is_dev,
1822 request_headers: &request_headers,
1823 cookie_config: cookie_config.as_ref(),
1824 response_headers: std::cell::RefCell::new(Vec::new()),
1825 };
1826 let http_method = HttpMethod::from_str(method.as_str());
1827 let (s, b, _ct) = pylon_router::route(
1828 &router_ctx,
1829 http_method,
1830 &url,
1831 &body,
1832 auth_token.as_deref(),
1833 );
1834 let extra_headers = router_ctx.take_response_headers();
1835 (s, b, "application/json", false, extra_headers)
1836 }
1837 };
1838
1839 let mut response = Response::from_string(&response_body)
1840 .with_status_code(status)
1841 .with_header(Header::from_bytes("Content-Type", content_type).unwrap())
1842 .with_header(
1843 Header::from_bytes(
1844 "Access-Control-Allow-Origin",
1845 cors_origin.as_bytes().to_vec(),
1846 )
1847 .unwrap(),
1848 )
1849 .with_header(
1850 Header::from_bytes(
1851 "Access-Control-Allow-Methods",
1852 "GET, POST, PATCH, DELETE, OPTIONS",
1853 )
1854 .unwrap(),
1855 )
1856 .with_header(
1857 Header::from_bytes(
1858 "Access-Control-Allow-Headers",
1859 "Content-Type, Authorization",
1860 )
1861 .unwrap(),
1862 );
1863 if allow_credentials {
1868 response = response
1869 .with_header(
1870 Header::from_bytes("Access-Control-Allow-Credentials", "true").unwrap(),
1871 )
1872 .with_header(Header::from_bytes("Vary", "Origin").unwrap());
1873 }
1874
1875 for (name, value) in extra_headers {
1882 if let Ok(h) = Header::from_bytes(name.as_bytes(), value.as_bytes().to_vec()) {
1883 response = response.with_header(h);
1884 }
1885 }
1886
1887 if is_studio {
1900 response = response.with_header(
1901 Header::from_bytes(
1902 "Content-Security-Policy",
1903 "default-src 'self' 'unsafe-inline' 'unsafe-eval' https://cdn.tailwindcss.com https://unpkg.com ws: wss:",
1904 ).unwrap(),
1905 );
1906 }
1907
1908 let response = with_security_headers(response);
1909
1910 let _ = request.respond(response);
1911 mt.record_request(method.as_str(), status);
1912 }
1913
1914 tracing::warn!("Shutting down gracefully...");
1915
1916 let drain_timeout = std::time::Duration::from_secs(
1919 std::env::var("PYLON_DRAIN_SECS")
1920 .ok()
1921 .and_then(|s| s.parse().ok())
1922 .unwrap_or(10),
1923 );
1924 let start = Instant::now();
1925
1926 if let Some(reg) = &shard_registry {
1928 for id in reg.ids() {
1929 if let Some(shard) = reg.get(&id) {
1930 shard.stop();
1931 }
1932 }
1933 }
1934
1935 let _ = &scheduler; while start.elapsed() < drain_timeout {
1940 let pending_jobs = job_queue.stats().pending;
1941 if pending_jobs == 0 {
1942 break;
1943 }
1944 std::thread::sleep(std::time::Duration::from_millis(100));
1945 }
1946
1947 let elapsed = start.elapsed();
1948 tracing::warn!(
1949 "Drain complete in {:.1}s (timeout {}s)",
1950 elapsed.as_secs_f32(),
1951 drain_timeout.as_secs()
1952 );
1953 Ok(())
1954}
1955
1956fn json_error(code: &str, message: &str) -> String {
1961 pylon_router::json_error(code, message)
1962}
1963
1964fn build_session_store(app_db_path: Option<&str>) -> SessionStore {
1974 if std::env::var("PYLON_SESSION_IN_MEMORY")
1975 .map(|v| v == "1" || v == "true")
1976 .unwrap_or(false)
1977 {
1978 return SessionStore::new();
1979 }
1980 let explicit = std::env::var("PYLON_SESSION_DB").ok();
1981 let default_path = app_db_path.map(|p| format!("{p}.sessions.db"));
1982 let path = match explicit.or(default_path) {
1983 Some(p) => p,
1984 None => return SessionStore::new(),
1985 };
1986 match crate::session_backend::SqliteSessionBackend::open(&path) {
1987 Ok(backend) => {
1988 tracing::info!("[pylon] Session persistence enabled: {path}");
1989 SessionStore::with_backend(Box::new(backend))
1990 }
1991 Err(e) => {
1992 tracing::warn!(
1993 "[pylon] could not open session DB {path}: {e}. Falling back to in-memory sessions."
1994 );
1995 SessionStore::new()
1996 }
1997 }
1998}
1999
2000fn parse_multipart_first_file(
2011 body: &[u8],
2012 content_type_header: &str,
2013) -> Option<(String, String, Vec<u8>)> {
2014 let boundary_param = content_type_header
2016 .split(';')
2017 .find_map(|p| p.trim().strip_prefix("boundary="))?;
2018 let boundary = boundary_param.trim_matches('"');
2019 let delimiter = format!("--{boundary}");
2020 let delimiter_bytes = delimiter.as_bytes();
2021
2022 let mut pos = 0usize;
2024 while pos < body.len() {
2025 let next = find_subslice(&body[pos..], delimiter_bytes)?;
2027 let part_start = pos + next + delimiter_bytes.len();
2028 if part_start + 2 > body.len() {
2030 return None;
2031 }
2032 if &body[part_start..part_start + 2] == b"--" {
2033 return None; }
2035 let header_start = part_start + skip_crlf(&body[part_start..]);
2036
2037 let header_end_offset = find_subslice(&body[header_start..], b"\r\n\r\n")?;
2039 let headers = &body[header_start..header_start + header_end_offset];
2040 let data_start = header_start + header_end_offset + 4;
2041
2042 let next_delim_offset = find_subslice(&body[data_start..], delimiter_bytes)?;
2044 let mut data_end = data_start + next_delim_offset;
2046 if data_end >= 2 && &body[data_end - 2..data_end] == b"\r\n" {
2047 data_end -= 2;
2048 }
2049
2050 let headers_str = std::str::from_utf8(headers).ok()?;
2052 let mut filename: Option<String> = None;
2053 let mut part_ct = String::from("application/octet-stream");
2054 let mut has_file = false;
2055 for line in headers_str.split("\r\n") {
2056 let lower = line.to_ascii_lowercase();
2057 if let Some(rest) = lower.strip_prefix("content-disposition:") {
2058 if rest.contains("filename=") {
2059 has_file = true;
2060 if let Some(start) = line.find("filename=\"") {
2062 let from = start + 10;
2063 if let Some(end_offset) = line[from..].find('"') {
2064 filename = Some(line[from..from + end_offset].to_string());
2065 }
2066 }
2067 }
2068 } else if let Some(rest) = lower.strip_prefix("content-type:") {
2069 part_ct = rest.trim().to_string();
2070 }
2071 }
2072
2073 if has_file {
2074 let name = filename.unwrap_or_else(|| "upload".into());
2075 return Some((name, part_ct, body[data_start..data_end].to_vec()));
2076 }
2077
2078 pos = data_end;
2079 }
2080 None
2081}
2082
2083fn find_subslice(haystack: &[u8], needle: &[u8]) -> Option<usize> {
2084 if needle.is_empty() || needle.len() > haystack.len() {
2085 return None;
2086 }
2087 haystack.windows(needle.len()).position(|w| w == needle)
2088}
2089
2090fn skip_crlf(buf: &[u8]) -> usize {
2091 if buf.len() >= 2 && &buf[0..2] == b"\r\n" {
2092 2
2093 } else if !buf.is_empty() && buf[0] == b'\n' {
2094 1
2095 } else {
2096 0
2097 }
2098}
2099
2100#[cfg(test)]
2101mod multipart_tests {
2102 use super::*;
2103
2104 #[test]
2105 fn parses_single_file() {
2106 let body = b"--bnd\r\n\
2107Content-Disposition: form-data; name=\"file\"; filename=\"hello.txt\"\r\n\
2108Content-Type: text/plain\r\n\
2109\r\n\
2110Hello world\r\n\
2111--bnd--\r\n";
2112 let ct = "multipart/form-data; boundary=bnd";
2113 let (name, content_type, bytes) = parse_multipart_first_file(body, ct).unwrap();
2114 assert_eq!(name, "hello.txt");
2115 assert_eq!(content_type, "text/plain");
2116 assert_eq!(bytes, b"Hello world");
2117 }
2118
2119 #[test]
2120 fn returns_none_without_file_part() {
2121 let body = b"--bnd\r\n\
2122Content-Disposition: form-data; name=\"field\"\r\n\
2123\r\n\
2124just text\r\n\
2125--bnd--\r\n";
2126 let ct = "multipart/form-data; boundary=bnd";
2127 assert!(parse_multipart_first_file(body, ct).is_none());
2128 }
2129
2130 #[test]
2131 fn returns_none_when_no_boundary() {
2132 assert!(parse_multipart_first_file(b"anything", "application/json").is_none());
2133 }
2134}