1#[allow(unused_imports)]
2use std::io::Read;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5use std::time::Instant;
6
7use pylon_auth::SessionStore;
8use pylon_http::HttpMethod;
9use pylon_plugin::PluginRegistry;
10use pylon_policy::PolicyEngine;
11use pylon_sync::{ChangeKind, ChangeLog};
12use tiny_http::{Header, Method, Response, Server};
13
14use crate::datastore::{
15 CacheAdapter, EmailAdapter, LocalFileOps, PluginHooksAdapter, PubSubAdapter,
16 RuntimeOpenApiGenerator, ShardOpsAdapter, WsSseNotifier,
17};
18use crate::jobs::{JobQueue, JobResult, Worker};
19use crate::metrics::Metrics;
20use crate::pubsub::PubSubBroker;
21use crate::rate_limit::RateLimiter;
22use crate::rooms::RoomManager;
23use crate::scheduler::Scheduler;
24use crate::sse::SseHub;
25use crate::workflows::WorkflowEngine;
26use crate::ws::WsHub;
27use crate::Runtime;
28use pylon_plugin::builtin::ai_proxy::{AiMessage, AiProxyPlugin};
29use pylon_plugin::builtin::cache::CachePlugin;
30
31struct StreamingBody {
41 rx: std::sync::mpsc::Receiver<Vec<u8>>,
42 buf: Vec<u8>,
43 pos: usize,
44}
45
46impl StreamingBody {
47 fn new(rx: std::sync::mpsc::Receiver<Vec<u8>>) -> Self {
48 Self {
49 rx,
50 buf: Vec::new(),
51 pos: 0,
52 }
53 }
54}
55
56impl std::io::Read for StreamingBody {
57 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
58 if self.pos < self.buf.len() {
61 let remaining = &self.buf[self.pos..];
62 let n = remaining.len().min(buf.len());
63 buf[..n].copy_from_slice(&remaining[..n]);
64 self.pos += n;
65 if self.pos >= self.buf.len() {
66 self.buf.clear();
67 self.pos = 0;
68 }
69 return Ok(n);
70 }
71
72 match self.rx.recv() {
74 Ok(data) if data.is_empty() => Ok(0),
75 Ok(data) => {
76 let n = data.len().min(buf.len());
77 buf[..n].copy_from_slice(&data[..n]);
78 if n < data.len() {
79 self.buf = data;
80 self.pos = n;
81 }
82 Ok(n)
83 }
84 Err(_) => Ok(0), }
86 }
87}
88
89static SHUTDOWN: AtomicBool = AtomicBool::new(false);
91
92pub fn request_shutdown() {
98 SHUTDOWN.store(true, Ordering::SeqCst);
99 if let Some(srv) = SERVER_HANDLE.get() {
102 srv.unblock();
103 }
104}
105
106static SERVER_HANDLE: std::sync::OnceLock<Arc<Server>> = std::sync::OnceLock::new();
109
110fn security_headers() -> Vec<Header> {
116 vec![
117 Header::from_bytes("X-Content-Type-Options", "nosniff").unwrap(),
118 Header::from_bytes("X-Frame-Options", "DENY").unwrap(),
119 Header::from_bytes("X-XSS-Protection", "1; mode=block").unwrap(),
120 ]
121}
122
123fn with_security_headers<R: std::io::Read>(response: Response<R>) -> Response<R> {
125 let mut resp = response;
126 for header in security_headers() {
127 resp = resp.with_header(header);
128 }
129 resp
130}
131
132pub fn start(runtime: Arc<Runtime>, port: u16) -> Result<(), String> {
134 start_with_plugins(runtime, port, None)
135}
136
137pub fn start_with_plugins(
139 runtime: Arc<Runtime>,
140 port: u16,
141 plugins: Option<Arc<PluginRegistry>>,
142) -> Result<(), String> {
143 start_server(runtime, port, plugins, None)
144}
145
146pub fn start_with_shards(
149 runtime: Arc<Runtime>,
150 port: u16,
151 plugins: Option<Arc<PluginRegistry>>,
152 shard_registry: Arc<dyn pylon_realtime::DynShardRegistry>,
153) -> Result<(), String> {
154 start_server(runtime, port, plugins, Some(shard_registry))
155}
156
157fn start_server(
158 runtime: Arc<Runtime>,
159 port: u16,
160 plugins: Option<Arc<PluginRegistry>>,
161 shard_registry: Option<Arc<dyn pylon_realtime::DynShardRegistry>>,
162) -> Result<(), String> {
163 pylon_observability::run_tracing_hook();
168
169 let addr = format!("0.0.0.0:{port}");
170 let server = Server::http(&addr).map_err(|e| format!("Failed to start server: {e}"))?;
171 let server = Arc::new(server);
172
173 let _ = SERVER_HANDLE.set(Arc::clone(&server));
175
176 let session_store = Arc::new(build_session_store(runtime.db_path().as_deref()));
177 let magic_codes = Arc::new(pylon_auth::MagicCodeStore::new());
178 let oauth_state = Arc::new(match std::env::var("PYLON_SESSION_DB").ok() {
182 Some(path) => match crate::oauth_backend::SqliteOAuthBackend::open(&path) {
183 Ok(backend) => pylon_auth::OAuthStateStore::with_backend(Box::new(backend)),
184 Err(e) => {
185 tracing::warn!(
186 "[auth] OAuth state SQLite backend unavailable: {e} — falling back to in-memory"
187 );
188 pylon_auth::OAuthStateStore::new()
189 }
190 },
191 None => pylon_auth::OAuthStateStore::new(),
192 });
193 let policy_engine = Arc::new(PolicyEngine::from_manifest(runtime.manifest()));
194 let change_log = Arc::new(ChangeLog::new());
195
196 for entity in runtime.manifest().entities.iter() {
204 match runtime.list(&entity.name) {
205 Ok(rows) => {
206 for row in rows {
207 if let Some(id) = row.get("id").and_then(|v| v.as_str()) {
208 change_log.append(&entity.name, id, ChangeKind::Insert, Some(row.clone()));
209 }
210 }
211 }
212 Err(_) => {
213 }
215 }
216 }
217 let ws_hub = WsHub::new();
218 let sse_hub = SseHub::new();
219 let is_dev_early = std::env::var("PYLON_DEV_MODE")
232 .map(|v| v == "1" || v == "true")
233 .unwrap_or(true);
234 let plugin_rl_max: u32 = if is_dev_early { 100_000 } else { 100 };
235 let plugin_reg: Arc<PluginRegistry> = plugins.unwrap_or_else(|| {
236 let mut reg = PluginRegistry::new(runtime.manifest().clone());
237 reg.register(Arc::new(
238 pylon_plugin::builtin::rate_limit::RateLimitPlugin::new(
239 plugin_rl_max,
240 std::time::Duration::from_secs(60),
241 ),
242 ));
243 reg.register(Arc::new(
248 pylon_plugin::builtin::tenant_scope::TenantScopePlugin::from_manifest(
249 runtime.manifest(),
250 ),
251 ));
252 Arc::new(reg)
253 });
254 let room_mgr = Arc::new(RoomManager::new(120)); let ws_port = port + 1;
256 let sse_port = port + 2;
257
258 let start_time = Instant::now();
260
261 let metrics = Arc::new(Metrics::new());
262
263 let cache = Arc::new(CachePlugin::new(100_000));
265 let pubsub_broker = Arc::new(PubSubBroker::new(100));
266
267 let job_queue = Arc::new(JobQueue::new(1000));
269
270 let jobs_in_memory = std::env::var("PYLON_JOBS_IN_MEMORY")
276 .map(|v| v == "1" || v == "true")
277 .unwrap_or(false);
278 if !jobs_in_memory {
279 let jobs_db_path = std::env::var("PYLON_JOBS_DB").ok().unwrap_or_else(|| {
280 runtime
281 .db_path()
282 .map(|p| format!("{p}.jobs.db"))
283 .unwrap_or_else(|| "pylon.jobs.db".into())
284 });
285 match crate::job_store::JobStore::open(&jobs_db_path) {
286 Ok(store) => {
287 let store = Arc::new(store);
288 let restored = job_queue.restore_from(&store);
289 if restored > 0 {
290 tracing::info!("[jobs] Restored {restored} pending job(s) from {jobs_db_path}");
291 }
292 job_queue.attach_store(store);
293 }
294 Err(e) => {
295 tracing::warn!(
296 "[jobs] Could not open job store at {jobs_db_path}: {e} — running without persistence"
297 );
298 }
299 }
300 }
301
302 {
304 let cache_ref = Arc::clone(&cache);
305 job_queue.register(
306 "pylon.cache.cleanup",
307 Arc::new(move |_job| {
308 cache_ref.cleanup_expired();
309 JobResult::Success
310 }),
311 );
312 let rooms_ref = Arc::clone(&room_mgr);
313 job_queue.register(
314 "pylon.rooms.cleanup",
315 Arc::new(move |_job| {
316 rooms_ref.cleanup_idle();
317 JobResult::Success
318 }),
319 );
320 }
321
322 let scheduler = Arc::new(Scheduler::new(Arc::clone(&job_queue)));
323 let _ = scheduler.schedule(
325 "pylon.cache.cleanup",
326 "*/10 * * * *",
327 Arc::new(|_| JobResult::Success),
328 );
329 let _ = scheduler.schedule(
330 "pylon.rooms.cleanup",
331 "*/5 * * * *",
332 Arc::new(|_| JobResult::Success),
333 );
334
335 let _worker_handles: Vec<_> = (0..2)
337 .map(|i| {
338 let w = Worker::new(Arc::clone(&job_queue), &format!("worker-{i}"));
339 w.start()
340 })
341 .collect();
342
343 let _scheduler_handle = Arc::clone(&scheduler).start();
345
346 let wf_runner_url = std::env::var("PYLON_WORKFLOW_RUNNER_URL")
348 .unwrap_or_else(|_| "http://127.0.0.1:9876/run".to_string());
349 let workflow_engine = Arc::new(WorkflowEngine::new(&wf_runner_url, 10_000));
350
351 let default_rl_max = if is_dev_early { 100_000 } else { 600 };
362 let rl_max: u32 = std::env::var("PYLON_RATE_LIMIT_MAX")
363 .ok()
364 .and_then(|v| v.parse().ok())
365 .unwrap_or(default_rl_max);
366 let rl_window: u64 = std::env::var("PYLON_RATE_LIMIT_WINDOW")
367 .ok()
368 .and_then(|v| v.parse().ok())
369 .unwrap_or(60);
370 let rate_limiter = Arc::new(RateLimiter::new(rl_max, rl_window));
371
372 let fn_rl_max: u32 = std::env::var("PYLON_FN_RATE_LIMIT_MAX")
376 .ok()
377 .and_then(|v| v.parse().ok())
378 .unwrap_or(30);
379 let fn_rl_window: u64 = std::env::var("PYLON_FN_RATE_LIMIT_WINDOW")
380 .ok()
381 .and_then(|v| v.parse().ok())
382 .unwrap_or(60);
383 let fn_rate_limiter = Arc::new(RateLimiter::new(fn_rl_max, fn_rl_window));
384
385 let fn_notifier: Arc<dyn pylon_router::ChangeNotifier> =
393 Arc::new(crate::datastore::WsSseNotifier {
394 ws: Arc::clone(&ws_hub),
395 sse: Arc::clone(&sse_hub),
396 });
397 let fn_ops_maybe = crate::datastore::try_spawn_functions(
398 Arc::clone(&runtime),
399 Arc::clone(&job_queue),
400 Arc::clone(&fn_rate_limiter),
401 Arc::clone(&change_log),
402 fn_notifier,
403 );
404
405 let is_dev = std::env::var("PYLON_DEV_MODE")
407 .map(|v| v == "1" || v == "true")
408 .unwrap_or(true);
409
410 let cors_origin = match std::env::var("PYLON_CORS_ORIGIN") {
417 Ok(v) => v,
418 Err(_) if is_dev => "*".to_string(),
419 Err(_) => {
420 return Err(
421 "PYLON_CORS_ORIGIN must be set in production (non-dev mode). \
422 Set it to your frontend's origin, or set PYLON_DEV_MODE=true \
423 for local development."
424 .into(),
425 );
426 }
427 };
428 if !is_dev && cors_origin == "*" {
429 return Err("PYLON_CORS_ORIGIN=\"*\" is refused in production mode. \
430 Set it to an explicit origin (https://app.example.com)."
431 .into());
432 }
433 let allow_credentials = cors_origin != "*";
440 if Header::from_bytes(
444 "Access-Control-Allow-Origin",
445 cors_origin.as_bytes().to_vec(),
446 )
447 .is_err()
448 {
449 return Err(format!(
450 "PYLON_CORS_ORIGIN={cors_origin:?} contains bytes that are not a valid HTTP header value"
451 ));
452 }
453
454 let admin_token: Option<String> = std::env::var("PYLON_ADMIN_TOKEN").ok();
456
457 let cookie_config = Arc::new({
464 let app_name = runtime.manifest().name.as_str();
465 pylon_auth::CookieConfig::from_env(&pylon_auth::CookieConfig::default_name_for(app_name))
466 });
467
468 let csrf_origins: Vec<String> = match std::env::var("PYLON_CSRF_ORIGINS") {
478 Ok(v) => v
479 .split(',')
480 .map(|s| s.trim().to_string())
481 .filter(|s| !s.is_empty())
482 .collect(),
483 Err(_) => {
484 if is_dev {
485 vec!["*".to_string()]
486 } else if cors_origin != "*" {
487 vec![cors_origin.clone()]
488 } else {
489 vec![]
491 }
492 }
493 };
494 let csrf = Arc::new(pylon_plugin::builtin::csrf::CsrfPlugin::new(csrf_origins));
495
496 {
510 let hub = Arc::clone(&ws_hub);
511 let sessions = Arc::clone(&session_store);
512 let runtime_for_fetcher = Arc::clone(&runtime);
513 let pe_for_fetcher = Arc::clone(&policy_engine);
514 let fetcher: crate::ws::SnapshotFetcher = Arc::new(move |auth_ctx, entity, row_id| {
515 use pylon_http::DataStore;
516 let row = match runtime_for_fetcher.get_by_id(entity, row_id) {
521 Ok(Some(v)) => v,
522 _ => return None,
523 };
524 if !matches!(
525 pe_for_fetcher.check_entity_read(entity, auth_ctx, Some(&row)),
526 pylon_policy::PolicyResult::Allowed
527 ) {
528 return None;
529 }
530 let snap = match runtime_for_fetcher.crdt_snapshot(entity, row_id) {
531 Ok(Some(bytes)) => bytes,
532 _ => return None,
533 };
534 pylon_router::encode_crdt_frame(
535 pylon_router::CRDT_FRAME_SNAPSHOT,
536 entity,
537 row_id,
538 &snap,
539 )
540 .ok()
541 });
542 std::thread::spawn(move || {
543 crate::ws::start_ws_server(hub, sessions, ws_port, Some(fetcher));
544 });
545 }
546
547 {
549 let hub = Arc::clone(&sse_hub);
550 std::thread::spawn(move || {
551 crate::sse::start_sse_server(hub, sse_port);
552 });
553 }
554
555 let shard_ws_port = port + 3;
557 if let Some(reg) = shard_registry.clone() {
558 let sessions = Arc::clone(&session_store);
559 std::thread::spawn(move || {
560 crate::shard_ws::start_shard_ws_server(reg, sessions, shard_ws_port);
561 });
562 }
563
564 tracing::warn!("pylon dev server listening on http://localhost:{port}");
565 tracing::info!(" WebSocket: ws://localhost:{ws_port}");
566 tracing::info!(" Studio: http://localhost:{port}/studio");
567 tracing::info!(" API: http://localhost:{port}/api/entities/<entity>");
568 tracing::info!(" Auth: http://localhost:{port}/api/auth/session");
569
570 loop {
574 if SHUTDOWN.load(Ordering::Relaxed) {
575 break;
576 }
577
578 let mut request = match server.recv() {
579 Ok(rq) => rq,
580 Err(_) => {
581 break;
583 }
584 };
585
586 if SHUTDOWN.load(Ordering::Relaxed) {
587 break;
588 }
589
590 let rt = Arc::clone(&runtime);
591 let ss = Arc::clone(&session_store);
592 let pe = Arc::clone(&policy_engine);
593 let cl = Arc::clone(&change_log);
594 let wh = Arc::clone(&ws_hub);
595 let sh = Arc::clone(&sse_hub);
596 let mc = Arc::clone(&magic_codes);
597 let pr = Arc::clone(&plugin_reg);
598 let rm = Arc::clone(&room_mgr);
599 let mt = Arc::clone(&metrics);
600 let os = Arc::clone(&oauth_state);
601 let ca = Arc::clone(&cache);
602 let ps = Arc::clone(&pubsub_broker);
603 let jq = Arc::clone(&job_queue);
604 let sc = Arc::clone(&scheduler);
605 let we = Arc::clone(&workflow_engine);
606 let fn_ops_ref = fn_ops_maybe.clone();
607 let shards_ref = shard_registry.clone();
608 let cors_origin = cors_origin.clone();
609 let cookie_config = Arc::clone(&cookie_config);
610 let allow_credentials = allow_credentials;
611 let is_dev = is_dev;
612
613 let method = request.method().clone();
614 let url = request.url().to_string();
615
616 if url == "/health" && method == Method::Get {
618 let uptime = start_time.elapsed().as_secs();
619 let body = serde_json::json!({
620 "status": "ok",
621 "version": "0.1.0",
622 "uptime_secs": uptime,
623 })
624 .to_string();
625
626 let response = with_security_headers(
627 Response::from_string(&body)
628 .with_status_code(200u16)
629 .with_header(Header::from_bytes("Content-Type", "application/json").unwrap())
630 .with_header(
631 Header::from_bytes(
632 "Access-Control-Allow-Origin",
633 cors_origin.as_bytes().to_vec(),
634 )
635 .unwrap(),
636 ),
637 );
638 let _ = request.respond(response);
639 continue;
640 }
641
642 if url == "/metrics" && method == Method::Get {
647 if !is_dev {
648 let admin_bytes = admin_token.as_deref().unwrap_or("").as_bytes();
649 let auth_ok = !admin_bytes.is_empty()
650 && request.headers().iter().any(|h| {
651 let name = h.field.as_str().as_str();
652 name.eq_ignore_ascii_case("Authorization")
653 && h.value
654 .as_str()
655 .strip_prefix("Bearer ")
656 .map(|t| pylon_auth::constant_time_eq(t.as_bytes(), admin_bytes))
657 .unwrap_or(false)
658 });
659 if !auth_ok {
660 let body = json_error(
661 "UNAUTHORIZED",
662 "/metrics requires admin bearer token in non-dev mode",
663 );
664 let response = with_security_headers(
665 Response::from_string(&body)
666 .with_status_code(401u16)
667 .with_header(
668 Header::from_bytes("Content-Type", "application/json").unwrap(),
669 ),
670 );
671 let _ = request.respond(response);
672 continue;
673 }
674 }
675 let prefers_prometheus = request.headers().iter().any(|h| {
676 (h.field.as_str() == "Accept" || h.field.as_str() == "accept")
677 && (h.value.as_str().contains("text/plain")
678 || h.value.as_str().contains("application/openmetrics-text"))
679 });
680 let (body, content_type) = if prefers_prometheus {
681 (mt.prometheus(), "text/plain; version=0.0.4")
682 } else {
683 (mt.snapshot().to_string(), "application/json")
684 };
685 let response = with_security_headers(
686 Response::from_string(&body)
687 .with_status_code(200u16)
688 .with_header(Header::from_bytes("Content-Type", content_type).unwrap())
689 .with_header(
690 Header::from_bytes(
691 "Access-Control-Allow-Origin",
692 cors_origin.as_bytes().to_vec(),
693 )
694 .unwrap(),
695 ),
696 );
697 let _ = request.respond(response);
698 mt.record_request("GET", 200);
699 continue;
700 }
701
702 let peer_ip = request
704 .remote_addr()
705 .map(|a| a.ip().to_string())
706 .unwrap_or_default();
707
708 let is_preflight = matches!(method, Method::Options);
714 if !is_preflight {
715 if let Err(retry_after) = rate_limiter.check(&peer_ip) {
716 let err_body = json_error(
717 "RATE_LIMITED",
718 &format!("Too many requests. Retry after {retry_after} seconds."),
719 );
720 let response = with_security_headers(
721 Response::from_string(&err_body)
722 .with_status_code(429u16)
723 .with_header(
724 Header::from_bytes("Content-Type", "application/json").unwrap(),
725 )
726 .with_header(
727 Header::from_bytes(
728 "Access-Control-Allow-Origin",
729 cors_origin.as_bytes().to_vec(),
730 )
731 .unwrap(),
732 )
733 .with_header(
734 Header::from_bytes(
735 "Access-Control-Allow-Methods",
736 "GET, POST, PATCH, DELETE, OPTIONS",
737 )
738 .unwrap(),
739 )
740 .with_header(
741 Header::from_bytes(
742 "Access-Control-Allow-Headers",
743 "Content-Type, Authorization",
744 )
745 .unwrap(),
746 )
747 .with_header(
748 Header::from_bytes(
749 "Retry-After",
750 retry_after.to_string().as_bytes().to_vec(),
751 )
752 .unwrap(),
753 ),
754 );
755 let _ = request.respond(response);
756 mt.record_request(method.as_str(), 429);
757 continue;
758 }
759 } {
775 let method_str = method.as_str();
776 let is_bearer = request.headers().iter().any(|h| {
777 (h.field.as_str() == "Authorization" || h.field.as_str() == "authorization")
778 && h.value.as_str().starts_with("Bearer ")
779 });
780 if !is_bearer && !matches!(method, Method::Get | Method::Head | Method::Options) {
785 let origin = request
786 .headers()
787 .iter()
788 .find(|h| h.field.as_str() == "Origin" || h.field.as_str() == "origin")
789 .map(|h| h.value.as_str().to_string());
790 let referer = request
791 .headers()
792 .iter()
793 .find(|h| h.field.as_str() == "Referer" || h.field.as_str() == "referer")
794 .map(|h| h.value.as_str().to_string());
795 if let Err(err) = csrf.check(method_str, origin.as_deref(), referer.as_deref()) {
796 let body = json_error(&err.code, &err.message);
797 let response = with_security_headers(
798 Response::from_string(&body)
799 .with_status_code(err.status)
800 .with_header(
801 Header::from_bytes("Content-Type", "application/json").unwrap(),
802 )
803 .with_header(
804 Header::from_bytes(
805 "Access-Control-Allow-Origin",
806 cors_origin.as_bytes().to_vec(),
807 )
808 .unwrap(),
809 ),
810 );
811 let _ = request.respond(response);
812 mt.record_request(method_str, err.status);
813 continue;
814 }
815 }
816 }
817
818 let bearer_token: Option<String> = request
828 .headers()
829 .iter()
830 .find(|h| h.field.as_str() == "Authorization" || h.field.as_str() == "authorization")
831 .and_then(|h| {
832 let val = h.value.as_str();
833 val.strip_prefix("Bearer ").map(|t| t.to_string())
834 });
835 let cookie_token: Option<String> = if bearer_token.is_some() {
836 None
837 } else {
838 request
839 .headers()
840 .iter()
841 .find(|h| h.field.as_str() == "Cookie" || h.field.as_str() == "cookie")
842 .and_then(|h| {
843 pylon_auth::extract_session_cookie(h.value.as_str(), &cookie_config.name)
844 })
845 };
846 let auth_token: Option<String> = bearer_token.or(cookie_token);
847 let auth_ctx = if admin_token.is_some()
848 && auth_token.is_some()
849 && pylon_auth::constant_time_eq(
850 auth_token.as_deref().unwrap_or("").as_bytes(),
851 admin_token.as_deref().unwrap_or("").as_bytes(),
852 ) {
853 pylon_auth::AuthContext::admin()
854 } else {
855 ss.resolve(auth_token.as_deref())
856 };
857
858 if url == "/api/__test__/reset" && method == Method::Post {
873 let is_loopback = peer_ip == "127.0.0.1"
874 || peer_ip == "::1"
875 || peer_ip.starts_with("127.")
876 || peer_ip == "localhost";
877 if !is_dev || !rt.is_in_memory() || !is_loopback {
878 let body = json_error(
879 "RESET_REFUSED",
880 "reset endpoint is only available in dev mode + in-memory DB + from loopback",
881 );
882 let response = with_security_headers(
883 Response::from_string(&body)
884 .with_status_code(403u16)
885 .with_header(
886 Header::from_bytes("Content-Type", "application/json").unwrap(),
887 )
888 .with_header(
889 Header::from_bytes(
890 "Access-Control-Allow-Origin",
891 cors_origin.as_bytes().to_vec(),
892 )
893 .unwrap(),
894 ),
895 );
896 let _ = request.respond(response);
897 mt.record_request("POST", 403);
898 continue;
899 }
900 let (status, body) = match rt.reset_for_tests() {
901 Ok(()) => (200u16, "{\"reset\":true}".to_string()),
902 Err(e) => (500u16, json_error(&e.code, &e.message)),
903 };
904 let response = with_security_headers(
905 Response::from_string(&body)
906 .with_status_code(status)
907 .with_header(Header::from_bytes("Content-Type", "application/json").unwrap())
908 .with_header(
909 Header::from_bytes(
910 "Access-Control-Allow-Origin",
911 cors_origin.as_bytes().to_vec(),
912 )
913 .unwrap(),
914 ),
915 );
916 let _ = request.respond(response);
917 mt.record_request("POST", status);
918 continue;
919 }
920
921 if url == "/api/files/upload" && method == Method::Post {
930 const UPLOAD_MAX: usize = 10 * 1024 * 1024;
931 if let Some(declared) = request.body_length() {
934 if declared > UPLOAD_MAX {
935 let err = json_error(
936 "PAYLOAD_TOO_LARGE",
937 &format!("Content-Length {declared} exceeds upload max of {UPLOAD_MAX}"),
938 );
939 let response = with_security_headers(
940 Response::from_string(&err)
941 .with_status_code(413u16)
942 .with_header(
943 Header::from_bytes("Content-Type", "application/json").unwrap(),
944 )
945 .with_header(
946 Header::from_bytes(
947 "Access-Control-Allow-Origin",
948 cors_origin.as_bytes().to_vec(),
949 )
950 .unwrap(),
951 ),
952 );
953 let _ = request.respond(response);
954 mt.record_request("POST", 413);
955 continue;
956 }
957 }
958 if auth_ctx.user_id.is_none() {
959 let err = json_error(
960 "AUTH_REQUIRED",
961 "/api/files/upload requires an authenticated session",
962 );
963 let response = with_security_headers(
964 Response::from_string(&err)
965 .with_status_code(401u16)
966 .with_header(
967 Header::from_bytes("Content-Type", "application/json").unwrap(),
968 )
969 .with_header(
970 Header::from_bytes(
971 "Access-Control-Allow-Origin",
972 cors_origin.as_bytes().to_vec(),
973 )
974 .unwrap(),
975 ),
976 );
977 let _ = request.respond(response);
978 mt.record_request("POST", 401);
979 continue;
980 }
981 use std::io::Read;
986 let mut bytes: Vec<u8> = Vec::with_capacity(8192);
987 let mut limited = request.as_reader().take((UPLOAD_MAX as u64) + 1);
988 let _ = limited.read_to_end(&mut bytes);
989
990 const MAX: usize = UPLOAD_MAX;
991 if bytes.len() > MAX {
992 let err = json_error("PAYLOAD_TOO_LARGE", "File exceeds 10 MB limit");
993 let response = with_security_headers(
994 Response::from_string(&err)
995 .with_status_code(413u16)
996 .with_header(
997 Header::from_bytes("Content-Type", "application/json").unwrap(),
998 )
999 .with_header(
1000 Header::from_bytes(
1001 "Access-Control-Allow-Origin",
1002 cors_origin.as_bytes().to_vec(),
1003 )
1004 .unwrap(),
1005 ),
1006 );
1007 let _ = request.respond(response);
1008 mt.record_request("POST", 413);
1009 continue;
1010 }
1011
1012 let content_type = request
1014 .headers()
1015 .iter()
1016 .find(|h| h.field.as_str() == "Content-Type" || h.field.as_str() == "content-type")
1017 .map(|h| h.value.as_str().to_string())
1018 .unwrap_or_else(|| "application/octet-stream".into());
1019 let filename = request
1020 .headers()
1021 .iter()
1022 .find(|h| h.field.as_str() == "X-Filename" || h.field.as_str() == "x-filename")
1023 .map(|h| h.value.as_str().to_string())
1024 .unwrap_or_else(|| "upload".into());
1025
1026 let (name, ct, payload) = if content_type.starts_with("multipart/form-data") {
1028 match parse_multipart_first_file(&bytes, &content_type) {
1029 Some(p) => p,
1030 None => {
1031 let err = json_error("INVALID_MULTIPART", "Could not parse multipart body");
1032 let response = with_security_headers(
1033 Response::from_string(&err)
1034 .with_status_code(400u16)
1035 .with_header(
1036 Header::from_bytes("Content-Type", "application/json").unwrap(),
1037 )
1038 .with_header(
1039 Header::from_bytes(
1040 "Access-Control-Allow-Origin",
1041 cors_origin.as_bytes().to_vec(),
1042 )
1043 .unwrap(),
1044 ),
1045 );
1046 let _ = request.respond(response);
1047 mt.record_request("POST", 400);
1048 continue;
1049 }
1050 }
1051 } else {
1052 (filename, content_type, bytes)
1053 };
1054
1055 let storage = pylon_storage::files::LocalFileStorage::new(
1056 &std::env::var("PYLON_FILES_DIR").unwrap_or_else(|_| "uploads".into()),
1057 &std::env::var("PYLON_FILES_URL_PREFIX").unwrap_or_else(|_| "/api/files".into()),
1058 );
1059
1060 let (status, body) =
1061 match pylon_storage::files::FileStorage::store(&storage, &name, &payload, &ct) {
1062 Ok(stored) => (
1063 201u16,
1064 serde_json::to_string(&stored).unwrap_or_else(|_| "{}".into()),
1065 ),
1066 Err(e) => (500u16, json_error(&e.code, &e.message)),
1067 };
1068
1069 let response = with_security_headers(
1070 Response::from_string(&body)
1071 .with_status_code(status)
1072 .with_header(Header::from_bytes("Content-Type", "application/json").unwrap())
1073 .with_header(
1074 Header::from_bytes(
1075 "Access-Control-Allow-Origin",
1076 cors_origin.as_bytes().to_vec(),
1077 )
1078 .unwrap(),
1079 ),
1080 );
1081 let _ = request.respond(response);
1082 mt.record_request("POST", status);
1083 continue;
1084 }
1085
1086 const MAX_BODY_SIZE: usize = 10 * 1024 * 1024;
1096
1097 if let Some(declared) = request.body_length() {
1098 if declared > MAX_BODY_SIZE {
1099 let err_body = json_error(
1100 "PAYLOAD_TOO_LARGE",
1101 &format!("Content-Length {declared} exceeds max of {MAX_BODY_SIZE}"),
1102 );
1103 let response = with_security_headers(
1104 Response::from_string(&err_body)
1105 .with_status_code(413u16)
1106 .with_header(
1107 Header::from_bytes(
1108 "Access-Control-Allow-Origin",
1109 cors_origin.as_bytes().to_vec(),
1110 )
1111 .unwrap(),
1112 ),
1113 );
1114 let _ = request.respond(response);
1115 mt.record_request(method.as_str(), 413);
1116 continue;
1117 }
1118 }
1119
1120 let mut body = String::new();
1121 if !matches!(
1122 method,
1123 Method::Get | Method::Head | Method::Options | Method::Delete
1124 ) {
1125 use std::io::Read;
1126 let mut limited = request.as_reader().take((MAX_BODY_SIZE as u64) + 1);
1127 let _ = limited.read_to_string(&mut body);
1128 }
1129
1130 if body.len() > MAX_BODY_SIZE {
1131 let err_body = json_error(
1132 "PAYLOAD_TOO_LARGE",
1133 &format!(
1134 "Request body exceeds maximum size of {} bytes",
1135 MAX_BODY_SIZE,
1136 ),
1137 );
1138 let response = with_security_headers(
1139 Response::from_string(&err_body)
1140 .with_status_code(413u16)
1141 .with_header(Header::from_bytes("Content-Type", "application/json").unwrap())
1142 .with_header(
1143 Header::from_bytes(
1144 "Access-Control-Allow-Origin",
1145 cors_origin.as_bytes().to_vec(),
1146 )
1147 .unwrap(),
1148 ),
1149 );
1150 let _ = request.respond(response);
1151 mt.record_request(method.as_str(), 413);
1152 continue;
1153 }
1154
1155 if method == Method::Get {
1159 if let Some(rest) = url.strip_prefix("/api/shards/") {
1160 let rest = rest.split('?').next().unwrap_or(rest);
1161 if let Some(shard_id) = rest.strip_suffix("/connect") {
1162 if auth_ctx.user_id.is_none() {
1167 let err = json_error(
1168 "AUTH_REQUIRED",
1169 "Shard connect requires an authenticated session",
1170 );
1171 let response = with_security_headers(
1172 Response::from_string(&err)
1173 .with_status_code(401u16)
1174 .with_header(
1175 Header::from_bytes("Content-Type", "application/json").unwrap(),
1176 )
1177 .with_header(
1178 Header::from_bytes(
1179 "Access-Control-Allow-Origin",
1180 cors_origin.as_bytes().to_vec(),
1181 )
1182 .unwrap(),
1183 ),
1184 );
1185 let _ = request.respond(response);
1186 mt.record_request("GET", 401);
1187 continue;
1188 }
1189 let shards = match &shards_ref {
1190 Some(s) => Arc::clone(s),
1191 None => {
1192 let err = json_error(
1193 "SHARDS_NOT_AVAILABLE",
1194 "Shard system is not configured",
1195 );
1196 let response = with_security_headers(
1197 Response::from_string(&err)
1198 .with_status_code(503u16)
1199 .with_header(
1200 Header::from_bytes("Content-Type", "application/json")
1201 .unwrap(),
1202 )
1203 .with_header(
1204 Header::from_bytes(
1205 "Access-Control-Allow-Origin",
1206 cors_origin.as_bytes().to_vec(),
1207 )
1208 .unwrap(),
1209 ),
1210 );
1211 let _ = request.respond(response);
1212 mt.record_request("GET", 503);
1213 continue;
1214 }
1215 };
1216 let shard = match shards.get(shard_id) {
1217 Some(s) => s,
1218 None => {
1219 let err = json_error(
1220 "SHARD_NOT_FOUND",
1221 &format!("Shard \"{shard_id}\" not found"),
1222 );
1223 let response = with_security_headers(
1224 Response::from_string(&err)
1225 .with_status_code(404u16)
1226 .with_header(
1227 Header::from_bytes("Content-Type", "application/json")
1228 .unwrap(),
1229 )
1230 .with_header(
1231 Header::from_bytes(
1232 "Access-Control-Allow-Origin",
1233 cors_origin.as_bytes().to_vec(),
1234 )
1235 .unwrap(),
1236 ),
1237 );
1238 let _ = request.respond(response);
1239 mt.record_request("GET", 404);
1240 continue;
1241 }
1242 };
1243
1244 let sub_id = url
1247 .split("sid=")
1248 .nth(1)
1249 .and_then(|s| s.split('&').next())
1250 .map(|s| s.to_string())
1251 .or_else(|| auth_ctx.user_id.clone())
1252 .unwrap_or_else(|| {
1253 format!(
1254 "anon_{}",
1255 std::time::SystemTime::now()
1256 .duration_since(std::time::UNIX_EPOCH)
1257 .unwrap_or_default()
1258 .as_nanos()
1259 )
1260 });
1261 let subscriber_id = pylon_realtime::SubscriberId::new(sub_id);
1262
1263 let (tx, rx) = std::sync::mpsc::channel::<Vec<u8>>();
1264 let streaming_body = StreamingBody::new(rx);
1265
1266 let tx_clone = tx.clone();
1267 let sink: pylon_realtime::SnapshotSink =
1268 Box::new(move |tick: u64, bytes: &[u8]| {
1269 let mut frame = format!("id: {tick}\ndata: ").into_bytes();
1272 frame.extend_from_slice(bytes);
1273 frame.extend_from_slice(b"\n\n");
1274 let _ = tx_clone.send(frame);
1275 });
1276
1277 let shard_auth = pylon_realtime::ShardAuth {
1278 user_id: auth_ctx.user_id.clone(),
1279 is_admin: auth_ctx.is_admin,
1280 };
1281 if let Err(e) = shard.add_subscriber(subscriber_id.clone(), sink, &shard_auth) {
1282 let (status, code) = match &e {
1283 pylon_realtime::ShardError::Unauthorized(_) => (403u16, "UNAUTHORIZED"),
1284 _ => (429u16, "SUBSCRIBE_FAILED"),
1285 };
1286 let err = json_error(code, &e.to_string());
1287 let response = with_security_headers(
1288 Response::from_string(&err)
1289 .with_status_code(status)
1290 .with_header(
1291 Header::from_bytes("Content-Type", "application/json").unwrap(),
1292 )
1293 .with_header(
1294 Header::from_bytes(
1295 "Access-Control-Allow-Origin",
1296 cors_origin.as_bytes().to_vec(),
1297 )
1298 .unwrap(),
1299 ),
1300 );
1301 let _ = request.respond(response);
1302 mt.record_request("GET", status);
1303 continue;
1304 }
1305
1306 {
1309 let shard_cleanup = Arc::clone(&shard);
1310 let sub_id_cleanup = subscriber_id.clone();
1311 let tx_liveness = tx.clone();
1312 std::thread::spawn(move || {
1313 loop {
1316 std::thread::sleep(std::time::Duration::from_secs(30));
1317 if tx_liveness.send(b": heartbeat\n\n".to_vec()).is_err() {
1318 shard_cleanup.remove_subscriber(&sub_id_cleanup);
1319 return;
1320 }
1321 if !shard_cleanup.is_running() {
1322 return;
1323 }
1324 }
1325 });
1326 }
1327
1328 let response = with_security_headers(Response::new(
1329 tiny_http::StatusCode(200),
1330 vec![
1331 Header::from_bytes("Content-Type", "text/event-stream").unwrap(),
1332 Header::from_bytes("Cache-Control", "no-cache").unwrap(),
1333 Header::from_bytes("Connection", "keep-alive").unwrap(),
1334 Header::from_bytes(
1335 "Access-Control-Allow-Origin",
1336 cors_origin.as_bytes().to_vec(),
1337 )
1338 .unwrap(),
1339 ],
1340 streaming_body,
1341 None,
1342 None,
1343 ));
1344 let _ = request.respond(response);
1345 mt.record_request("GET", 200);
1346 continue;
1347 }
1348 }
1349 }
1350
1351 if method == Method::Post
1353 && url.starts_with("/api/fn/")
1354 && url != "/api/fn/traces"
1355 && request.headers().iter().any(|h| {
1356 (h.field.as_str() == "Accept" || h.field.as_str() == "accept")
1357 && h.value.as_str().contains("text/event-stream")
1358 })
1359 {
1360 let fn_name = url
1361 .strip_prefix("/api/fn/")
1362 .unwrap_or("")
1363 .split('?')
1364 .next()
1365 .unwrap_or("")
1366 .to_string();
1367
1368 if let Some(fn_ops) = &fn_ops_maybe {
1369 if pylon_router::FnOps::get_fn(fn_ops.as_ref(), &fn_name).is_none() {
1373 let err = json_error(
1374 "FN_NOT_FOUND",
1375 &format!("Function \"{fn_name}\" is not registered"),
1376 );
1377 let response = with_security_headers(
1378 Response::from_string(&err)
1379 .with_status_code(404u16)
1380 .with_header(
1381 Header::from_bytes("Content-Type", "application/json").unwrap(),
1382 )
1383 .with_header(
1384 Header::from_bytes(
1385 "Access-Control-Allow-Origin",
1386 cors_origin.as_bytes().to_vec(),
1387 )
1388 .unwrap(),
1389 ),
1390 );
1391 let _ = request.respond(response);
1392 mt.record_request("POST", 404);
1393 continue;
1394 }
1395 let identity = auth_ctx.user_id.as_deref().unwrap_or("anon");
1397 if let Err(retry_after) =
1398 pylon_router::FnOps::check_rate_limit(fn_ops.as_ref(), &fn_name, identity)
1399 {
1400 let body = format!(
1401 r#"{{"error":{{"code":"RATE_LIMITED","message":"Function \"{fn_name}\" rate limit exceeded","retry_after_secs":{retry_after}}}}}"#
1402 );
1403 let response = with_security_headers(
1404 Response::from_string(&body)
1405 .with_status_code(429u16)
1406 .with_header(
1407 Header::from_bytes("Content-Type", "application/json").unwrap(),
1408 )
1409 .with_header(
1410 Header::from_bytes(
1411 "Access-Control-Allow-Origin",
1412 cors_origin.as_bytes().to_vec(),
1413 )
1414 .unwrap(),
1415 ),
1416 );
1417 let _ = request.respond(response);
1418 mt.record_request("POST", 429);
1419 continue;
1420 }
1421
1422 let args: serde_json::Value =
1423 serde_json::from_str(&body).unwrap_or(serde_json::json!({}));
1424
1425 let auth = pylon_functions::protocol::AuthInfo {
1426 user_id: auth_ctx.user_id.clone(),
1427 is_admin: auth_ctx.is_admin,
1428 tenant_id: auth_ctx.tenant_id.clone(),
1429 };
1430
1431 let (tx, rx) = std::sync::mpsc::channel::<Vec<u8>>();
1432 let streaming_body = StreamingBody::new(rx);
1433
1434 let fn_ops_cl = Arc::clone(fn_ops);
1435 let tx_stream = tx.clone();
1436 std::thread::spawn(move || {
1437 let tx_cb = tx_stream.clone();
1438 let on_stream: Box<dyn FnMut(&str) + Send> = Box::new(move |chunk: &str| {
1439 let sse = format!("data: {}\n\n", chunk);
1440 let _ = tx_cb.send(sse.into_bytes());
1441 });
1442
1443 let result = pylon_router::FnOps::call(
1444 fn_ops_cl.as_ref(),
1445 &fn_name,
1446 args,
1447 auth,
1448 Some(on_stream),
1449 None, );
1451 match result {
1452 Ok((value, _trace)) => {
1453 let done = format!(
1454 "event: result\ndata: {}\n\n",
1455 serde_json::to_string(&value).unwrap_or_else(|_| "null".into())
1456 );
1457 let _ = tx_stream.send(done.into_bytes());
1458 }
1459 Err(e) => {
1460 let err = format!(
1461 "event: error\ndata: {}\n\n",
1462 serde_json::json!({"code": e.code, "message": e.message})
1463 );
1464 let _ = tx_stream.send(err.into_bytes());
1465 }
1466 }
1467 });
1468
1469 let response = with_security_headers(Response::new(
1470 tiny_http::StatusCode(200),
1471 vec![
1472 Header::from_bytes("Content-Type", "text/event-stream").unwrap(),
1473 Header::from_bytes("Cache-Control", "no-cache").unwrap(),
1474 Header::from_bytes("Connection", "keep-alive").unwrap(),
1475 Header::from_bytes(
1476 "Access-Control-Allow-Origin",
1477 cors_origin.as_bytes().to_vec(),
1478 )
1479 .unwrap(),
1480 ],
1481 streaming_body,
1482 None,
1483 None,
1484 ));
1485 let _ = request.respond(response);
1486 mt.record_request("POST", 200);
1487 continue;
1488 }
1489 }
1490
1491 if url == "/api/ai/stream" && method == Method::Post {
1493 if auth_ctx.user_id.is_none() {
1496 let err = json_error(
1497 "AUTH_REQUIRED",
1498 "/api/ai/stream requires an authenticated session",
1499 );
1500 let response = with_security_headers(
1501 Response::from_string(&err)
1502 .with_status_code(401u16)
1503 .with_header(
1504 Header::from_bytes("Content-Type", "application/json").unwrap(),
1505 )
1506 .with_header(
1507 Header::from_bytes(
1508 "Access-Control-Allow-Origin",
1509 cors_origin.as_bytes().to_vec(),
1510 )
1511 .unwrap(),
1512 ),
1513 );
1514 let _ = request.respond(response);
1515 mt.record_request("POST", 401);
1516 continue;
1517 }
1518 let ai_provider = std::env::var("PYLON_AI_PROVIDER").unwrap_or_default();
1519 let ai_key = std::env::var("PYLON_AI_API_KEY").unwrap_or_default();
1520 let ai_model = std::env::var("PYLON_AI_MODEL").unwrap_or_default();
1521 let ai_base = std::env::var("PYLON_AI_BASE_URL").unwrap_or_default();
1522
1523 if ai_key.is_empty() && ai_provider != "custom" {
1524 let err = json_error(
1525 "AI_NOT_CONFIGURED",
1526 "Set PYLON_AI_PROVIDER and PYLON_AI_API_KEY",
1527 );
1528 let response = with_security_headers(
1529 Response::from_string(&err)
1530 .with_status_code(503u16)
1531 .with_header(
1532 Header::from_bytes("Content-Type", "application/json").unwrap(),
1533 )
1534 .with_header(
1535 Header::from_bytes(
1536 "Access-Control-Allow-Origin",
1537 cors_origin.as_bytes().to_vec(),
1538 )
1539 .unwrap(),
1540 ),
1541 );
1542 let _ = request.respond(response);
1543 mt.record_request("POST", 503);
1544 continue;
1545 }
1546
1547 let parsed: serde_json::Value = match serde_json::from_str(&body) {
1548 Ok(v) => v,
1549 Err(_) => {
1550 let err = json_error("INVALID_JSON", "Invalid request body");
1551 let response = with_security_headers(
1552 Response::from_string(&err)
1553 .with_status_code(400u16)
1554 .with_header(
1555 Header::from_bytes("Content-Type", "application/json").unwrap(),
1556 )
1557 .with_header(
1558 Header::from_bytes(
1559 "Access-Control-Allow-Origin",
1560 cors_origin.as_bytes().to_vec(),
1561 )
1562 .unwrap(),
1563 ),
1564 );
1565 let _ = request.respond(response);
1566 mt.record_request("POST", 400);
1567 continue;
1568 }
1569 };
1570
1571 let messages: Vec<AiMessage> = match parsed.get("messages").and_then(|m| m.as_array()) {
1572 Some(arr) => arr
1573 .iter()
1574 .filter_map(|m| {
1575 let role = m.get("role")?.as_str()?.to_string();
1576 let content = m.get("content")?.as_str()?.to_string();
1577 Some(AiMessage { role, content })
1578 })
1579 .collect(),
1580 None => {
1581 let err = json_error("MISSING_FIELD", "\"messages\" array is required");
1582 let response = with_security_headers(
1583 Response::from_string(&err)
1584 .with_status_code(400u16)
1585 .with_header(
1586 Header::from_bytes("Content-Type", "application/json").unwrap(),
1587 )
1588 .with_header(
1589 Header::from_bytes(
1590 "Access-Control-Allow-Origin",
1591 cors_origin.as_bytes().to_vec(),
1592 )
1593 .unwrap(),
1594 ),
1595 );
1596 let _ = request.respond(response);
1597 mt.record_request("POST", 400);
1598 continue;
1599 }
1600 };
1601
1602 let model = parsed
1604 .get("model")
1605 .and_then(|m| m.as_str())
1606 .map(|s| s.to_string())
1607 .unwrap_or(ai_model);
1608
1609 let proxy = match ai_provider.as_str() {
1610 "anthropic" => AiProxyPlugin::anthropic(&ai_key, &model),
1611 "openai" => AiProxyPlugin::openai(&ai_key, &model),
1612 "custom" => AiProxyPlugin::custom_with_model(&ai_base, &ai_key, &model),
1613 _ => AiProxyPlugin::openai(&ai_key, &model),
1614 };
1615
1616 let (tx, rx) = std::sync::mpsc::channel::<Vec<u8>>();
1619 let streaming_body = StreamingBody::new(rx);
1620
1621 std::thread::spawn(move || {
1624 let result = proxy.stream_completion(&messages, &mut |chunk| {
1625 let sse = format!(
1626 "data: {}
1627
1628",
1629 serde_json::json!({
1630 "choices": [{"index": 0, "delta": {"content": chunk}}]
1631 })
1632 );
1633 let _ = tx.send(sse.into_bytes());
1634 });
1635
1636 match result {
1638 Ok(_) => {
1639 let _ = tx.send(
1640 b"data: [DONE]
1641
1642"
1643 .to_vec(),
1644 );
1645 }
1646 Err(e) => {
1647 let err_event = format!(
1648 "data: {}
1649
1650",
1651 serde_json::json!({"error": {"message": e, "type": "stream_error"}})
1652 );
1653 let _ = tx.send(err_event.into_bytes());
1654 }
1655 }
1656 });
1658
1659 let response = with_security_headers(Response::new(
1660 tiny_http::StatusCode(200),
1661 vec![
1662 Header::from_bytes("Content-Type", "text/event-stream").unwrap(),
1663 Header::from_bytes("Cache-Control", "no-cache").unwrap(),
1664 Header::from_bytes("Connection", "keep-alive").unwrap(),
1665 Header::from_bytes(
1666 "Access-Control-Allow-Origin",
1667 cors_origin.as_bytes().to_vec(),
1668 )
1669 .unwrap(),
1670 ],
1671 streaming_body,
1672 None, None,
1674 ));
1675 let _ = request.respond(response);
1676 mt.record_request("POST", 200);
1677 continue;
1678 }
1679
1680 let (status, response_body, content_type, is_studio, extra_headers) = if (url == "/studio"
1691 || url == "/studio/")
1692 && method == Method::Get
1693 {
1694 if !is_dev && !auth_ctx.is_admin {
1695 let body = json_error(
1696 "AUTH_REQUIRED",
1697 "/studio requires admin auth in production (set PYLON_ADMIN_TOKEN and pass it as Bearer)",
1698 );
1699 let response = with_security_headers(
1700 Response::from_string(&body)
1701 .with_status_code(401u16)
1702 .with_header(
1703 Header::from_bytes("Content-Type", "application/json").unwrap(),
1704 )
1705 .with_header(
1706 Header::from_bytes(
1707 "Access-Control-Allow-Origin",
1708 cors_origin.as_bytes().to_vec(),
1709 )
1710 .unwrap(),
1711 ),
1712 );
1713 let _ = request.respond(response);
1714 mt.record_request("GET", 401);
1715 continue;
1716 }
1717 let host = request
1724 .headers()
1725 .iter()
1726 .find(|h| h.field.equiv("Host"))
1727 .map(|h| h.value.as_str().to_string())
1728 .unwrap_or_else(|| format!("localhost:{port}"));
1729 let scheme = request
1730 .headers()
1731 .iter()
1732 .find(|h| h.field.equiv("X-Forwarded-Proto"))
1733 .map(|h| h.value.as_str().to_string())
1734 .unwrap_or_else(|| "http".to_string());
1735 let base = format!("{scheme}://{host}");
1736 let html = pylon_studio_api::generate_studio_html(rt.manifest(), &base);
1737 (
1738 200u16,
1739 html,
1740 "text/html",
1741 true,
1742 Vec::<(String, String)>::new(),
1743 )
1744 } else {
1745 let meta = pylon_plugin::RequestMeta {
1749 peer_ip: peer_ip.as_str(),
1750 };
1751 if let Err(e) = pr.run_on_request_with_meta(method.as_str(), &url, &auth_ctx, &meta) {
1752 (
1753 e.status,
1754 json_error(&e.code, &e.message),
1755 "application/json",
1756 false,
1757 Vec::new(),
1758 )
1759 } else if let Some((s, b)) =
1760 pr.try_handle_route(method.as_str(), &url, &body, &auth_ctx)
1761 {
1762 (s, b, "application/json", false, Vec::new())
1764 } else {
1765 let notifier = WsSseNotifier {
1766 ws: Arc::clone(&wh),
1767 sse: Arc::clone(&sh),
1768 };
1769 let openapi_gen = RuntimeOpenApiGenerator {
1770 manifest: rt.manifest(),
1771 };
1772 let file_ops = LocalFileOps::new_default();
1773 let cache_adapter = CacheAdapter(Arc::clone(&ca));
1774 let pubsub_adapter = PubSubAdapter(Arc::clone(&ps));
1775 let email_adapter = EmailAdapter::from_env();
1776 let fn_ops: Option<&dyn pylon_router::FnOps> =
1777 fn_ops_ref.as_deref().map(|f| f as &dyn pylon_router::FnOps);
1778 let shard_adapter = shards_ref.as_ref().map(|reg| ShardOpsAdapter {
1779 registry: Arc::clone(reg),
1780 });
1781 let shard_ops: Option<&dyn pylon_router::ShardOps> = shard_adapter
1782 .as_ref()
1783 .map(|a| a as &dyn pylon_router::ShardOps);
1784 let plugin_hooks = PluginHooksAdapter(Arc::clone(&pr));
1785 let request_headers: Vec<(String, String)> = request
1790 .headers()
1791 .iter()
1792 .map(|h| (h.field.as_str().to_string(), h.value.as_str().to_string()))
1793 .collect();
1794 let router_ctx = pylon_router::RouterContext {
1795 store: rt.as_ref(),
1796 session_store: &ss,
1797 magic_codes: &mc,
1798 oauth_state: &os,
1799 policy_engine: &pe,
1800 change_log: &cl,
1801 notifier: ¬ifier,
1802 rooms: rm.as_ref(),
1803 cache: &cache_adapter,
1804 pubsub: &pubsub_adapter,
1805 jobs: jq.as_ref(),
1806 scheduler: sc.as_ref(),
1807 workflows: we.as_ref(),
1808 files: &file_ops,
1809 openapi: &openapi_gen,
1810 functions: fn_ops,
1811 email: &email_adapter,
1812 shards: shard_ops,
1813 plugin_hooks: &plugin_hooks,
1814 auth_ctx: &auth_ctx,
1815 is_dev,
1816 request_headers: &request_headers,
1817 cookie_config: cookie_config.as_ref(),
1818 response_headers: std::cell::RefCell::new(Vec::new()),
1819 };
1820 let http_method = HttpMethod::from_str(method.as_str());
1821 let (s, b, _ct) = pylon_router::route(
1822 &router_ctx,
1823 http_method,
1824 &url,
1825 &body,
1826 auth_token.as_deref(),
1827 );
1828 let extra_headers = router_ctx.take_response_headers();
1829 (s, b, "application/json", false, extra_headers)
1830 }
1831 };
1832
1833 let mut response = Response::from_string(&response_body)
1834 .with_status_code(status)
1835 .with_header(Header::from_bytes("Content-Type", content_type).unwrap())
1836 .with_header(
1837 Header::from_bytes(
1838 "Access-Control-Allow-Origin",
1839 cors_origin.as_bytes().to_vec(),
1840 )
1841 .unwrap(),
1842 )
1843 .with_header(
1844 Header::from_bytes(
1845 "Access-Control-Allow-Methods",
1846 "GET, POST, PATCH, DELETE, OPTIONS",
1847 )
1848 .unwrap(),
1849 )
1850 .with_header(
1851 Header::from_bytes(
1852 "Access-Control-Allow-Headers",
1853 "Content-Type, Authorization",
1854 )
1855 .unwrap(),
1856 );
1857 if allow_credentials {
1862 response = response
1863 .with_header(
1864 Header::from_bytes("Access-Control-Allow-Credentials", "true").unwrap(),
1865 )
1866 .with_header(Header::from_bytes("Vary", "Origin").unwrap());
1867 }
1868
1869 for (name, value) in extra_headers {
1876 if let Ok(h) = Header::from_bytes(name.as_bytes(), value.as_bytes().to_vec()) {
1877 response = response.with_header(h);
1878 }
1879 }
1880
1881 if is_studio {
1894 response = response.with_header(
1895 Header::from_bytes(
1896 "Content-Security-Policy",
1897 "default-src 'self' 'unsafe-inline' 'unsafe-eval' https://cdn.tailwindcss.com https://unpkg.com ws: wss:",
1898 ).unwrap(),
1899 );
1900 }
1901
1902 let response = with_security_headers(response);
1903
1904 let _ = request.respond(response);
1905 mt.record_request(method.as_str(), status);
1906 }
1907
1908 tracing::warn!("Shutting down gracefully...");
1909
1910 let drain_timeout = std::time::Duration::from_secs(
1913 std::env::var("PYLON_DRAIN_SECS")
1914 .ok()
1915 .and_then(|s| s.parse().ok())
1916 .unwrap_or(10),
1917 );
1918 let start = Instant::now();
1919
1920 if let Some(reg) = &shard_registry {
1922 for id in reg.ids() {
1923 if let Some(shard) = reg.get(&id) {
1924 shard.stop();
1925 }
1926 }
1927 }
1928
1929 let _ = &scheduler; while start.elapsed() < drain_timeout {
1934 let pending_jobs = job_queue.stats().pending;
1935 if pending_jobs == 0 {
1936 break;
1937 }
1938 std::thread::sleep(std::time::Duration::from_millis(100));
1939 }
1940
1941 let elapsed = start.elapsed();
1942 tracing::warn!(
1943 "Drain complete in {:.1}s (timeout {}s)",
1944 elapsed.as_secs_f32(),
1945 drain_timeout.as_secs()
1946 );
1947 Ok(())
1948}
1949
1950fn json_error(code: &str, message: &str) -> String {
1955 pylon_router::json_error(code, message)
1956}
1957
1958fn build_session_store(app_db_path: Option<&str>) -> SessionStore {
1968 if std::env::var("PYLON_SESSION_IN_MEMORY")
1969 .map(|v| v == "1" || v == "true")
1970 .unwrap_or(false)
1971 {
1972 return SessionStore::new();
1973 }
1974 let explicit = std::env::var("PYLON_SESSION_DB").ok();
1975 let default_path = app_db_path.map(|p| format!("{p}.sessions.db"));
1976 let path = match explicit.or(default_path) {
1977 Some(p) => p,
1978 None => return SessionStore::new(),
1979 };
1980 match crate::session_backend::SqliteSessionBackend::open(&path) {
1981 Ok(backend) => {
1982 tracing::info!("[pylon] Session persistence enabled: {path}");
1983 SessionStore::with_backend(Box::new(backend))
1984 }
1985 Err(e) => {
1986 tracing::warn!(
1987 "[pylon] could not open session DB {path}: {e}. Falling back to in-memory sessions."
1988 );
1989 SessionStore::new()
1990 }
1991 }
1992}
1993
1994fn parse_multipart_first_file(
2005 body: &[u8],
2006 content_type_header: &str,
2007) -> Option<(String, String, Vec<u8>)> {
2008 let boundary_param = content_type_header
2010 .split(';')
2011 .find_map(|p| p.trim().strip_prefix("boundary="))?;
2012 let boundary = boundary_param.trim_matches('"');
2013 let delimiter = format!("--{boundary}");
2014 let delimiter_bytes = delimiter.as_bytes();
2015
2016 let mut pos = 0usize;
2018 while pos < body.len() {
2019 let next = find_subslice(&body[pos..], delimiter_bytes)?;
2021 let part_start = pos + next + delimiter_bytes.len();
2022 if part_start + 2 > body.len() {
2024 return None;
2025 }
2026 if &body[part_start..part_start + 2] == b"--" {
2027 return None; }
2029 let header_start = part_start + skip_crlf(&body[part_start..]);
2030
2031 let header_end_offset = find_subslice(&body[header_start..], b"\r\n\r\n")?;
2033 let headers = &body[header_start..header_start + header_end_offset];
2034 let data_start = header_start + header_end_offset + 4;
2035
2036 let next_delim_offset = find_subslice(&body[data_start..], delimiter_bytes)?;
2038 let mut data_end = data_start + next_delim_offset;
2040 if data_end >= 2 && &body[data_end - 2..data_end] == b"\r\n" {
2041 data_end -= 2;
2042 }
2043
2044 let headers_str = std::str::from_utf8(headers).ok()?;
2046 let mut filename: Option<String> = None;
2047 let mut part_ct = String::from("application/octet-stream");
2048 let mut has_file = false;
2049 for line in headers_str.split("\r\n") {
2050 let lower = line.to_ascii_lowercase();
2051 if let Some(rest) = lower.strip_prefix("content-disposition:") {
2052 if rest.contains("filename=") {
2053 has_file = true;
2054 if let Some(start) = line.find("filename=\"") {
2056 let from = start + 10;
2057 if let Some(end_offset) = line[from..].find('"') {
2058 filename = Some(line[from..from + end_offset].to_string());
2059 }
2060 }
2061 }
2062 } else if let Some(rest) = lower.strip_prefix("content-type:") {
2063 part_ct = rest.trim().to_string();
2064 }
2065 }
2066
2067 if has_file {
2068 let name = filename.unwrap_or_else(|| "upload".into());
2069 return Some((name, part_ct, body[data_start..data_end].to_vec()));
2070 }
2071
2072 pos = data_end;
2073 }
2074 None
2075}
2076
2077fn find_subslice(haystack: &[u8], needle: &[u8]) -> Option<usize> {
2078 if needle.is_empty() || needle.len() > haystack.len() {
2079 return None;
2080 }
2081 haystack.windows(needle.len()).position(|w| w == needle)
2082}
2083
2084fn skip_crlf(buf: &[u8]) -> usize {
2085 if buf.len() >= 2 && &buf[0..2] == b"\r\n" {
2086 2
2087 } else if !buf.is_empty() && buf[0] == b'\n' {
2088 1
2089 } else {
2090 0
2091 }
2092}
2093
2094#[cfg(test)]
2095mod multipart_tests {
2096 use super::*;
2097
2098 #[test]
2099 fn parses_single_file() {
2100 let body = b"--bnd\r\n\
2101Content-Disposition: form-data; name=\"file\"; filename=\"hello.txt\"\r\n\
2102Content-Type: text/plain\r\n\
2103\r\n\
2104Hello world\r\n\
2105--bnd--\r\n";
2106 let ct = "multipart/form-data; boundary=bnd";
2107 let (name, content_type, bytes) = parse_multipart_first_file(body, ct).unwrap();
2108 assert_eq!(name, "hello.txt");
2109 assert_eq!(content_type, "text/plain");
2110 assert_eq!(bytes, b"Hello world");
2111 }
2112
2113 #[test]
2114 fn returns_none_without_file_part() {
2115 let body = b"--bnd\r\n\
2116Content-Disposition: form-data; name=\"field\"\r\n\
2117\r\n\
2118just text\r\n\
2119--bnd--\r\n";
2120 let ct = "multipart/form-data; boundary=bnd";
2121 assert!(parse_multipart_first_file(body, ct).is_none());
2122 }
2123
2124 #[test]
2125 fn returns_none_when_no_boundary() {
2126 assert!(parse_multipart_first_file(b"anything", "application/json").is_none());
2127 }
2128}