1#[allow(unused_imports)]
2use std::io::Read;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5use std::time::Instant;
6
7use pylon_auth::SessionStore;
8use pylon_http::HttpMethod;
9use pylon_plugin::PluginRegistry;
10use pylon_policy::PolicyEngine;
11use pylon_sync::{ChangeKind, ChangeLog};
12use tiny_http::{Header, Method, Response, Server};
13
14use crate::datastore::{
15 CacheAdapter, EmailAdapter, LocalFileOps, PluginHooksAdapter, PubSubAdapter,
16 RuntimeOpenApiGenerator, ShardOpsAdapter, WsSseNotifier,
17};
18use crate::jobs::{JobQueue, JobResult, Worker};
19use crate::metrics::Metrics;
20use crate::pubsub::PubSubBroker;
21use crate::rate_limit::RateLimiter;
22use crate::rooms::RoomManager;
23use crate::scheduler::Scheduler;
24use crate::sse::SseHub;
25use crate::workflows::WorkflowEngine;
26use crate::ws::WsHub;
27use crate::Runtime;
28use pylon_plugin::builtin::ai_proxy::{AiMessage, AiProxyPlugin};
29use pylon_plugin::builtin::cache::CachePlugin;
30
31struct StreamingBody {
41 rx: std::sync::mpsc::Receiver<Vec<u8>>,
42 buf: Vec<u8>,
43 pos: usize,
44}
45
46impl StreamingBody {
47 fn new(rx: std::sync::mpsc::Receiver<Vec<u8>>) -> Self {
48 Self {
49 rx,
50 buf: Vec::new(),
51 pos: 0,
52 }
53 }
54}
55
56impl std::io::Read for StreamingBody {
57 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
58 if self.pos < self.buf.len() {
61 let remaining = &self.buf[self.pos..];
62 let n = remaining.len().min(buf.len());
63 buf[..n].copy_from_slice(&remaining[..n]);
64 self.pos += n;
65 if self.pos >= self.buf.len() {
66 self.buf.clear();
67 self.pos = 0;
68 }
69 return Ok(n);
70 }
71
72 match self.rx.recv() {
74 Ok(data) if data.is_empty() => Ok(0),
75 Ok(data) => {
76 let n = data.len().min(buf.len());
77 buf[..n].copy_from_slice(&data[..n]);
78 if n < data.len() {
79 self.buf = data;
80 self.pos = n;
81 }
82 Ok(n)
83 }
84 Err(_) => Ok(0), }
86 }
87}
88
89static SHUTDOWN: AtomicBool = AtomicBool::new(false);
91
92pub fn request_shutdown() {
98 SHUTDOWN.store(true, Ordering::SeqCst);
99 if let Some(srv) = SERVER_HANDLE.get() {
102 srv.unblock();
103 }
104}
105
106static SERVER_HANDLE: std::sync::OnceLock<Arc<Server>> = std::sync::OnceLock::new();
109
110fn resolve_client_ip(request: &tiny_http::Request, trust_proxy_hops: usize) -> String {
126 let socket_ip = request
127 .remote_addr()
128 .map(|a| a.ip().to_string())
129 .unwrap_or_default();
130 if trust_proxy_hops == 0 {
131 return socket_ip;
132 }
133 let xff = request
136 .headers()
137 .iter()
138 .find(|h| {
139 h.field
140 .as_str()
141 .as_str()
142 .eq_ignore_ascii_case("X-Forwarded-For")
143 })
144 .map(|h| h.value.as_str().to_string());
145 let Some(xff) = xff else {
146 return socket_ip;
147 };
148 let entries: Vec<&str> = xff.split(',').map(str::trim).collect();
153 if entries.len() < trust_proxy_hops {
154 return socket_ip;
159 }
160 let candidate = entries[entries.len() - trust_proxy_hops];
161 if candidate.parse::<std::net::IpAddr>().is_ok() {
164 candidate.to_string()
165 } else {
166 socket_ip
167 }
168}
169
170fn security_headers() -> Vec<Header> {
179 vec![
180 Header::from_bytes("X-Content-Type-Options", "nosniff").unwrap(),
181 Header::from_bytes("X-Frame-Options", "DENY").unwrap(),
182 Header::from_bytes("X-XSS-Protection", "1; mode=block").unwrap(),
183 Header::from_bytes("Referrer-Policy", "strict-origin-when-cross-origin").unwrap(),
187 Header::from_bytes(
191 "Permissions-Policy",
192 "accelerometer=(), camera=(), geolocation=(), gyroscope=(), microphone=(), payment=(), usb=()",
193 )
194 .unwrap(),
195 ]
196}
197
198fn with_security_headers<R: std::io::Read>(response: Response<R>) -> Response<R> {
200 let mut resp = response;
201 for header in security_headers() {
202 resp = resp.with_header(header);
203 }
204 resp
205}
206
207pub fn start(runtime: Arc<Runtime>, port: u16) -> Result<(), String> {
209 start_with_plugins(runtime, port, None)
210}
211
212pub fn start_with_plugins(
214 runtime: Arc<Runtime>,
215 port: u16,
216 plugins: Option<Arc<PluginRegistry>>,
217) -> Result<(), String> {
218 start_server(runtime, port, plugins, None)
219}
220
221pub fn start_with_shards(
224 runtime: Arc<Runtime>,
225 port: u16,
226 plugins: Option<Arc<PluginRegistry>>,
227 shard_registry: Arc<dyn pylon_realtime::DynShardRegistry>,
228) -> Result<(), String> {
229 start_server(runtime, port, plugins, Some(shard_registry))
230}
231
232fn start_server(
233 runtime: Arc<Runtime>,
234 port: u16,
235 plugins: Option<Arc<PluginRegistry>>,
236 shard_registry: Option<Arc<dyn pylon_realtime::DynShardRegistry>>,
237) -> Result<(), String> {
238 pylon_observability::run_tracing_hook();
243
244 let addr = format!("0.0.0.0:{port}");
245 let server = Server::http(&addr).map_err(|e| format!("Failed to start server: {e}"))?;
246 let server = Arc::new(server);
247
248 let _ = SERVER_HANDLE.set(Arc::clone(&server));
250
251 let session_store = Arc::new(build_session_store(runtime.db_path().as_deref()));
252 let magic_codes = Arc::new(pylon_auth::MagicCodeStore::new());
253 let oauth_state = Arc::new(match std::env::var("PYLON_SESSION_DB").ok() {
257 Some(path) => match crate::oauth_backend::SqliteOAuthBackend::open(&path) {
258 Ok(backend) => pylon_auth::OAuthStateStore::with_backend(Box::new(backend)),
259 Err(e) => {
260 tracing::warn!(
261 "[auth] OAuth state SQLite backend unavailable: {e} — falling back to in-memory"
262 );
263 pylon_auth::OAuthStateStore::new()
264 }
265 },
266 None => pylon_auth::OAuthStateStore::new(),
267 });
268 let policy_engine = Arc::new(PolicyEngine::from_manifest(runtime.manifest()));
269 let change_log = Arc::new(ChangeLog::new());
270
271 for entity in runtime.manifest().entities.iter() {
279 match runtime.list(&entity.name) {
280 Ok(rows) => {
281 for row in rows {
282 if let Some(id) = row.get("id").and_then(|v| v.as_str()) {
283 change_log.append(&entity.name, id, ChangeKind::Insert, Some(row.clone()));
284 }
285 }
286 }
287 Err(_) => {
288 }
290 }
291 }
292 let ws_hub = WsHub::new();
293 let sse_hub = SseHub::new();
294 let is_dev_early = std::env::var("PYLON_DEV_MODE")
307 .map(|v| v == "1" || v == "true")
308 .unwrap_or(true);
309 let plugin_rl_max: u32 = if is_dev_early { 100_000 } else { 100 };
310 let plugin_reg: Arc<PluginRegistry> = plugins.unwrap_or_else(|| {
311 let mut reg = PluginRegistry::new(runtime.manifest().clone());
312 reg.register(Arc::new(
313 pylon_plugin::builtin::rate_limit::RateLimitPlugin::new(
314 plugin_rl_max,
315 std::time::Duration::from_secs(60),
316 ),
317 ));
318 reg.register(Arc::new(
323 pylon_plugin::builtin::tenant_scope::TenantScopePlugin::from_manifest(
324 runtime.manifest(),
325 ),
326 ));
327 Arc::new(reg)
328 });
329 let room_mgr = Arc::new(RoomManager::new(120)); let ws_port = port + 1;
331 let sse_port = port + 2;
332
333 let start_time = Instant::now();
335
336 let metrics = Arc::new(Metrics::new());
337
338 let cache = Arc::new(CachePlugin::new(100_000));
340 let pubsub_broker = Arc::new(PubSubBroker::new(100));
341
342 let job_queue = Arc::new(JobQueue::new(1000));
344
345 let jobs_in_memory = std::env::var("PYLON_JOBS_IN_MEMORY")
351 .map(|v| v == "1" || v == "true")
352 .unwrap_or(false);
353 if !jobs_in_memory {
354 let jobs_db_path = std::env::var("PYLON_JOBS_DB").ok().unwrap_or_else(|| {
355 runtime
356 .db_path()
357 .map(|p| format!("{p}.jobs.db"))
358 .unwrap_or_else(|| "pylon.jobs.db".into())
359 });
360 match crate::job_store::JobStore::open(&jobs_db_path) {
361 Ok(store) => {
362 let store = Arc::new(store);
363 let restored = job_queue.restore_from(&store);
364 if restored > 0 {
365 tracing::info!("[jobs] Restored {restored} pending job(s) from {jobs_db_path}");
366 }
367 job_queue.attach_store(store);
368 }
369 Err(e) => {
370 tracing::warn!(
371 "[jobs] Could not open job store at {jobs_db_path}: {e} — running without persistence"
372 );
373 }
374 }
375 }
376
377 {
379 let cache_ref = Arc::clone(&cache);
380 job_queue.register(
381 "pylon.cache.cleanup",
382 Arc::new(move |_job| {
383 cache_ref.cleanup_expired();
384 JobResult::Success
385 }),
386 );
387 let rooms_ref = Arc::clone(&room_mgr);
388 job_queue.register(
389 "pylon.rooms.cleanup",
390 Arc::new(move |_job| {
391 rooms_ref.cleanup_idle();
392 JobResult::Success
393 }),
394 );
395 }
396
397 let scheduler = Arc::new(Scheduler::new(Arc::clone(&job_queue)));
398 let _ = scheduler.schedule(
400 "pylon.cache.cleanup",
401 "*/10 * * * *",
402 Arc::new(|_| JobResult::Success),
403 );
404 let _ = scheduler.schedule(
405 "pylon.rooms.cleanup",
406 "*/5 * * * *",
407 Arc::new(|_| JobResult::Success),
408 );
409
410 let _worker_handles: Vec<_> = (0..2)
412 .map(|i| {
413 let w = Worker::new(Arc::clone(&job_queue), &format!("worker-{i}"));
414 w.start()
415 })
416 .collect();
417
418 let _scheduler_handle = Arc::clone(&scheduler).start();
420
421 let wf_runner_url = std::env::var("PYLON_WORKFLOW_RUNNER_URL")
423 .unwrap_or_else(|_| "http://127.0.0.1:9876/run".to_string());
424 let workflow_engine = Arc::new(WorkflowEngine::new(&wf_runner_url, 10_000));
425
426 let default_rl_max = if is_dev_early { 100_000 } else { 600 };
437 let rl_max: u32 = std::env::var("PYLON_RATE_LIMIT_MAX")
438 .ok()
439 .and_then(|v| v.parse().ok())
440 .unwrap_or(default_rl_max);
441 let rl_window: u64 = std::env::var("PYLON_RATE_LIMIT_WINDOW")
442 .ok()
443 .and_then(|v| v.parse().ok())
444 .unwrap_or(60);
445 let rate_limiter = Arc::new(RateLimiter::new(rl_max, rl_window));
446
447 let fn_rl_max: u32 = std::env::var("PYLON_FN_RATE_LIMIT_MAX")
451 .ok()
452 .and_then(|v| v.parse().ok())
453 .unwrap_or(30);
454 let fn_rl_window: u64 = std::env::var("PYLON_FN_RATE_LIMIT_WINDOW")
455 .ok()
456 .and_then(|v| v.parse().ok())
457 .unwrap_or(60);
458 let fn_rate_limiter = Arc::new(RateLimiter::new(fn_rl_max, fn_rl_window));
459
460 let fn_notifier: Arc<dyn pylon_router::ChangeNotifier> =
468 Arc::new(crate::datastore::WsSseNotifier {
469 ws: Arc::clone(&ws_hub),
470 sse: Arc::clone(&sse_hub),
471 });
472 let fn_ops_maybe = crate::datastore::try_spawn_functions(
473 Arc::clone(&runtime),
474 Arc::clone(&job_queue),
475 Arc::clone(&fn_rate_limiter),
476 Arc::clone(&change_log),
477 fn_notifier,
478 );
479
480 let is_dev = std::env::var("PYLON_DEV_MODE")
488 .map(|v| v == "1" || v == "true")
489 .unwrap_or(false);
490
491 let cors_origin = match std::env::var("PYLON_CORS_ORIGIN") {
498 Ok(v) => v,
499 Err(_) if is_dev => "*".to_string(),
500 Err(_) => {
501 return Err(
502 "PYLON_CORS_ORIGIN must be set in production (non-dev mode). \
503 Set it to your frontend's origin, or set PYLON_DEV_MODE=true \
504 for local development."
505 .into(),
506 );
507 }
508 };
509 if !is_dev && cors_origin == "*" {
510 return Err("PYLON_CORS_ORIGIN=\"*\" is refused in production mode. \
511 Set it to an explicit origin (https://app.example.com)."
512 .into());
513 }
514 let allow_credentials = cors_origin != "*";
521 if Header::from_bytes(
525 "Access-Control-Allow-Origin",
526 cors_origin.as_bytes().to_vec(),
527 )
528 .is_err()
529 {
530 return Err(format!(
531 "PYLON_CORS_ORIGIN={cors_origin:?} contains bytes that are not a valid HTTP header value"
532 ));
533 }
534
535 let admin_token: Option<String> = std::env::var("PYLON_ADMIN_TOKEN").ok();
537
538 let trust_proxy_hops: usize = std::env::var("PYLON_TRUST_PROXY_HOPS")
548 .ok()
549 .and_then(|v| v.parse().ok())
550 .unwrap_or(0);
551
552 let cookie_config = Arc::new({
559 let app_name = runtime.manifest().name.as_str();
560 pylon_auth::CookieConfig::from_env(&pylon_auth::CookieConfig::default_name_for(app_name))
561 });
562
563 let csrf_origins: Vec<String> = match std::env::var("PYLON_CSRF_ORIGINS") {
573 Ok(v) => v
574 .split(',')
575 .map(|s| s.trim().to_string())
576 .filter(|s| !s.is_empty())
577 .collect(),
578 Err(_) => {
579 if is_dev {
580 vec!["*".to_string()]
581 } else if cors_origin != "*" {
582 vec![cors_origin.clone()]
583 } else {
584 vec![]
586 }
587 }
588 };
589 let csrf = Arc::new(pylon_plugin::builtin::csrf::CsrfPlugin::new(csrf_origins));
590
591 {
605 let hub = Arc::clone(&ws_hub);
606 let sessions = Arc::clone(&session_store);
607 let runtime_for_fetcher = Arc::clone(&runtime);
608 let pe_for_fetcher = Arc::clone(&policy_engine);
609 let fetcher: crate::ws::SnapshotFetcher = Arc::new(move |auth_ctx, entity, row_id| {
610 use pylon_http::DataStore;
611 let row = match runtime_for_fetcher.get_by_id(entity, row_id) {
616 Ok(Some(v)) => v,
617 _ => return None,
618 };
619 if !matches!(
620 pe_for_fetcher.check_entity_read(entity, auth_ctx, Some(&row)),
621 pylon_policy::PolicyResult::Allowed
622 ) {
623 return None;
624 }
625 let snap = match runtime_for_fetcher.crdt_snapshot(entity, row_id) {
626 Ok(Some(bytes)) => bytes,
627 _ => return None,
628 };
629 pylon_router::encode_crdt_frame(
630 pylon_router::CRDT_FRAME_SNAPSHOT,
631 entity,
632 row_id,
633 &snap,
634 )
635 .ok()
636 });
637 std::thread::spawn(move || {
638 crate::ws::start_ws_server(hub, sessions, ws_port, Some(fetcher));
639 });
640 }
641
642 {
644 let hub = Arc::clone(&sse_hub);
645 std::thread::spawn(move || {
646 crate::sse::start_sse_server(hub, sse_port);
647 });
648 }
649
650 let shard_ws_port = port + 3;
652 if let Some(reg) = shard_registry.clone() {
653 let sessions = Arc::clone(&session_store);
654 std::thread::spawn(move || {
655 crate::shard_ws::start_shard_ws_server(reg, sessions, shard_ws_port);
656 });
657 }
658
659 tracing::warn!("pylon dev server listening on http://localhost:{port}");
660 tracing::info!(" WebSocket: ws://localhost:{ws_port}");
661 tracing::info!(" Studio: http://localhost:{port}/studio");
662 tracing::info!(" API: http://localhost:{port}/api/entities/<entity>");
663 tracing::info!(" Auth: http://localhost:{port}/api/auth/session");
664
665 loop {
669 if SHUTDOWN.load(Ordering::Relaxed) {
670 break;
671 }
672
673 let mut request = match server.recv() {
674 Ok(rq) => rq,
675 Err(_) => {
676 break;
678 }
679 };
680
681 if SHUTDOWN.load(Ordering::Relaxed) {
682 break;
683 }
684
685 let rt = Arc::clone(&runtime);
686 let ss = Arc::clone(&session_store);
687 let pe = Arc::clone(&policy_engine);
688 let cl = Arc::clone(&change_log);
689 let wh = Arc::clone(&ws_hub);
690 let sh = Arc::clone(&sse_hub);
691 let mc = Arc::clone(&magic_codes);
692 let pr = Arc::clone(&plugin_reg);
693 let rm = Arc::clone(&room_mgr);
694 let mt = Arc::clone(&metrics);
695 let os = Arc::clone(&oauth_state);
696 let ca = Arc::clone(&cache);
697 let ps = Arc::clone(&pubsub_broker);
698 let jq = Arc::clone(&job_queue);
699 let sc = Arc::clone(&scheduler);
700 let we = Arc::clone(&workflow_engine);
701 let fn_ops_ref = fn_ops_maybe.clone();
702 let shards_ref = shard_registry.clone();
703 let cors_origin = cors_origin.clone();
704 let cookie_config = Arc::clone(&cookie_config);
705 let allow_credentials = allow_credentials;
706 let is_dev = is_dev;
707
708 let method = request.method().clone();
709 let url = request.url().to_string();
710
711 if url == "/health" && method == Method::Get {
713 let uptime = start_time.elapsed().as_secs();
714 let body = serde_json::json!({
715 "status": "ok",
716 "version": "0.1.0",
717 "uptime_secs": uptime,
718 })
719 .to_string();
720
721 let response = with_security_headers(
722 Response::from_string(&body)
723 .with_status_code(200u16)
724 .with_header(Header::from_bytes("Content-Type", "application/json").unwrap())
725 .with_header(
726 Header::from_bytes(
727 "Access-Control-Allow-Origin",
728 cors_origin.as_bytes().to_vec(),
729 )
730 .unwrap(),
731 ),
732 );
733 let _ = request.respond(response);
734 continue;
735 }
736
737 if url == "/metrics" && method == Method::Get {
742 if !is_dev {
743 let admin_bytes = admin_token.as_deref().unwrap_or("").as_bytes();
744 let auth_ok = !admin_bytes.is_empty()
745 && request.headers().iter().any(|h| {
746 let name = h.field.as_str().as_str();
747 name.eq_ignore_ascii_case("Authorization")
748 && h.value
749 .as_str()
750 .strip_prefix("Bearer ")
751 .map(|t| pylon_auth::constant_time_eq(t.as_bytes(), admin_bytes))
752 .unwrap_or(false)
753 });
754 if !auth_ok {
755 let body = json_error(
756 "UNAUTHORIZED",
757 "/metrics requires admin bearer token in non-dev mode",
758 );
759 let response = with_security_headers(
760 Response::from_string(&body)
761 .with_status_code(401u16)
762 .with_header(
763 Header::from_bytes("Content-Type", "application/json").unwrap(),
764 ),
765 );
766 let _ = request.respond(response);
767 continue;
768 }
769 }
770 let prefers_prometheus = request.headers().iter().any(|h| {
771 (h.field.as_str() == "Accept" || h.field.as_str() == "accept")
772 && (h.value.as_str().contains("text/plain")
773 || h.value.as_str().contains("application/openmetrics-text"))
774 });
775 let (body, content_type) = if prefers_prometheus {
776 (mt.prometheus(), "text/plain; version=0.0.4")
777 } else {
778 (mt.snapshot().to_string(), "application/json")
779 };
780 let response = with_security_headers(
781 Response::from_string(&body)
782 .with_status_code(200u16)
783 .with_header(Header::from_bytes("Content-Type", content_type).unwrap())
784 .with_header(
785 Header::from_bytes(
786 "Access-Control-Allow-Origin",
787 cors_origin.as_bytes().to_vec(),
788 )
789 .unwrap(),
790 ),
791 );
792 let _ = request.respond(response);
793 mt.record_request("GET", 200);
794 continue;
795 }
796
797 let peer_ip = resolve_client_ip(&request, trust_proxy_hops);
803
804 let is_preflight = matches!(method, Method::Options);
810 if !is_preflight {
811 if let Err(retry_after) = rate_limiter.check(&peer_ip) {
812 let err_body = json_error(
813 "RATE_LIMITED",
814 &format!("Too many requests. Retry after {retry_after} seconds."),
815 );
816 let response = with_security_headers(
817 Response::from_string(&err_body)
818 .with_status_code(429u16)
819 .with_header(
820 Header::from_bytes("Content-Type", "application/json").unwrap(),
821 )
822 .with_header(
823 Header::from_bytes(
824 "Access-Control-Allow-Origin",
825 cors_origin.as_bytes().to_vec(),
826 )
827 .unwrap(),
828 )
829 .with_header(
830 Header::from_bytes(
831 "Access-Control-Allow-Methods",
832 "GET, POST, PATCH, DELETE, OPTIONS",
833 )
834 .unwrap(),
835 )
836 .with_header(
837 Header::from_bytes(
838 "Access-Control-Allow-Headers",
839 "Content-Type, Authorization",
840 )
841 .unwrap(),
842 )
843 .with_header(
844 Header::from_bytes(
845 "Retry-After",
846 retry_after.to_string().as_bytes().to_vec(),
847 )
848 .unwrap(),
849 ),
850 );
851 let _ = request.respond(response);
852 mt.record_request(method.as_str(), 429);
853 continue;
854 }
855 } {
871 let method_str = method.as_str();
872 let is_bearer = request.headers().iter().any(|h| {
873 (h.field.as_str() == "Authorization" || h.field.as_str() == "authorization")
874 && h.value.as_str().starts_with("Bearer ")
875 });
876 if !is_bearer && !matches!(method, Method::Get | Method::Head | Method::Options) {
881 let origin = request
882 .headers()
883 .iter()
884 .find(|h| h.field.as_str() == "Origin" || h.field.as_str() == "origin")
885 .map(|h| h.value.as_str().to_string());
886 let referer = request
887 .headers()
888 .iter()
889 .find(|h| h.field.as_str() == "Referer" || h.field.as_str() == "referer")
890 .map(|h| h.value.as_str().to_string());
891 if let Err(err) = csrf.check(method_str, origin.as_deref(), referer.as_deref()) {
892 let body = json_error(&err.code, &err.message);
893 let response = with_security_headers(
894 Response::from_string(&body)
895 .with_status_code(err.status)
896 .with_header(
897 Header::from_bytes("Content-Type", "application/json").unwrap(),
898 )
899 .with_header(
900 Header::from_bytes(
901 "Access-Control-Allow-Origin",
902 cors_origin.as_bytes().to_vec(),
903 )
904 .unwrap(),
905 ),
906 );
907 let _ = request.respond(response);
908 mt.record_request(method_str, err.status);
909 continue;
910 }
911 }
912 }
913
914 let bearer_token: Option<String> = request
924 .headers()
925 .iter()
926 .find(|h| h.field.as_str() == "Authorization" || h.field.as_str() == "authorization")
927 .and_then(|h| {
928 let val = h.value.as_str();
929 val.strip_prefix("Bearer ").map(|t| t.to_string())
930 });
931 let cookie_token: Option<String> = if bearer_token.is_some() {
932 None
933 } else {
934 request
935 .headers()
936 .iter()
937 .find(|h| h.field.as_str() == "Cookie" || h.field.as_str() == "cookie")
938 .and_then(|h| {
939 pylon_auth::extract_session_cookie(h.value.as_str(), &cookie_config.name)
940 })
941 };
942 let auth_token: Option<String> = bearer_token.or(cookie_token);
943 let auth_ctx = if admin_token.is_some()
944 && auth_token.is_some()
945 && pylon_auth::constant_time_eq(
946 auth_token.as_deref().unwrap_or("").as_bytes(),
947 admin_token.as_deref().unwrap_or("").as_bytes(),
948 ) {
949 pylon_auth::AuthContext::admin()
950 } else {
951 ss.resolve(auth_token.as_deref())
952 };
953
954 if url == "/api/__test__/reset" && method == Method::Post {
969 let is_loopback = peer_ip == "127.0.0.1"
970 || peer_ip == "::1"
971 || peer_ip.starts_with("127.")
972 || peer_ip == "localhost";
973 if !is_dev || !rt.is_in_memory() || !is_loopback {
974 let body = json_error(
975 "RESET_REFUSED",
976 "reset endpoint is only available in dev mode + in-memory DB + from loopback",
977 );
978 let response = with_security_headers(
979 Response::from_string(&body)
980 .with_status_code(403u16)
981 .with_header(
982 Header::from_bytes("Content-Type", "application/json").unwrap(),
983 )
984 .with_header(
985 Header::from_bytes(
986 "Access-Control-Allow-Origin",
987 cors_origin.as_bytes().to_vec(),
988 )
989 .unwrap(),
990 ),
991 );
992 let _ = request.respond(response);
993 mt.record_request("POST", 403);
994 continue;
995 }
996 let (status, body) = match rt.reset_for_tests() {
997 Ok(()) => (200u16, "{\"reset\":true}".to_string()),
998 Err(e) => (500u16, json_error(&e.code, &e.message)),
999 };
1000 let response = with_security_headers(
1001 Response::from_string(&body)
1002 .with_status_code(status)
1003 .with_header(Header::from_bytes("Content-Type", "application/json").unwrap())
1004 .with_header(
1005 Header::from_bytes(
1006 "Access-Control-Allow-Origin",
1007 cors_origin.as_bytes().to_vec(),
1008 )
1009 .unwrap(),
1010 ),
1011 );
1012 let _ = request.respond(response);
1013 mt.record_request("POST", status);
1014 continue;
1015 }
1016
1017 if url == "/api/files/upload" && method == Method::Post {
1026 const UPLOAD_MAX: usize = 10 * 1024 * 1024;
1027 if let Some(declared) = request.body_length() {
1030 if declared > UPLOAD_MAX {
1031 let err = json_error(
1032 "PAYLOAD_TOO_LARGE",
1033 &format!("Content-Length {declared} exceeds upload max of {UPLOAD_MAX}"),
1034 );
1035 let response = with_security_headers(
1036 Response::from_string(&err)
1037 .with_status_code(413u16)
1038 .with_header(
1039 Header::from_bytes("Content-Type", "application/json").unwrap(),
1040 )
1041 .with_header(
1042 Header::from_bytes(
1043 "Access-Control-Allow-Origin",
1044 cors_origin.as_bytes().to_vec(),
1045 )
1046 .unwrap(),
1047 ),
1048 );
1049 let _ = request.respond(response);
1050 mt.record_request("POST", 413);
1051 continue;
1052 }
1053 }
1054 if auth_ctx.user_id.is_none() {
1055 let err = json_error(
1056 "AUTH_REQUIRED",
1057 "/api/files/upload requires an authenticated session",
1058 );
1059 let response = with_security_headers(
1060 Response::from_string(&err)
1061 .with_status_code(401u16)
1062 .with_header(
1063 Header::from_bytes("Content-Type", "application/json").unwrap(),
1064 )
1065 .with_header(
1066 Header::from_bytes(
1067 "Access-Control-Allow-Origin",
1068 cors_origin.as_bytes().to_vec(),
1069 )
1070 .unwrap(),
1071 ),
1072 );
1073 let _ = request.respond(response);
1074 mt.record_request("POST", 401);
1075 continue;
1076 }
1077 use std::io::Read;
1082 let mut bytes: Vec<u8> = Vec::with_capacity(8192);
1083 let mut limited = request.as_reader().take((UPLOAD_MAX as u64) + 1);
1084 let _ = limited.read_to_end(&mut bytes);
1085
1086 const MAX: usize = UPLOAD_MAX;
1087 if bytes.len() > MAX {
1088 let err = json_error("PAYLOAD_TOO_LARGE", "File exceeds 10 MB limit");
1089 let response = with_security_headers(
1090 Response::from_string(&err)
1091 .with_status_code(413u16)
1092 .with_header(
1093 Header::from_bytes("Content-Type", "application/json").unwrap(),
1094 )
1095 .with_header(
1096 Header::from_bytes(
1097 "Access-Control-Allow-Origin",
1098 cors_origin.as_bytes().to_vec(),
1099 )
1100 .unwrap(),
1101 ),
1102 );
1103 let _ = request.respond(response);
1104 mt.record_request("POST", 413);
1105 continue;
1106 }
1107
1108 let content_type = request
1110 .headers()
1111 .iter()
1112 .find(|h| h.field.as_str() == "Content-Type" || h.field.as_str() == "content-type")
1113 .map(|h| h.value.as_str().to_string())
1114 .unwrap_or_else(|| "application/octet-stream".into());
1115 let filename = request
1116 .headers()
1117 .iter()
1118 .find(|h| h.field.as_str() == "X-Filename" || h.field.as_str() == "x-filename")
1119 .map(|h| h.value.as_str().to_string())
1120 .unwrap_or_else(|| "upload".into());
1121
1122 let (name, ct, payload) = if content_type.starts_with("multipart/form-data") {
1124 match parse_multipart_first_file(&bytes, &content_type) {
1125 Some(p) => p,
1126 None => {
1127 let err = json_error("INVALID_MULTIPART", "Could not parse multipart body");
1128 let response = with_security_headers(
1129 Response::from_string(&err)
1130 .with_status_code(400u16)
1131 .with_header(
1132 Header::from_bytes("Content-Type", "application/json").unwrap(),
1133 )
1134 .with_header(
1135 Header::from_bytes(
1136 "Access-Control-Allow-Origin",
1137 cors_origin.as_bytes().to_vec(),
1138 )
1139 .unwrap(),
1140 ),
1141 );
1142 let _ = request.respond(response);
1143 mt.record_request("POST", 400);
1144 continue;
1145 }
1146 }
1147 } else {
1148 (filename, content_type, bytes)
1149 };
1150
1151 let storage = pylon_storage::files::LocalFileStorage::new(
1152 &std::env::var("PYLON_FILES_DIR").unwrap_or_else(|_| "uploads".into()),
1153 &std::env::var("PYLON_FILES_URL_PREFIX").unwrap_or_else(|_| "/api/files".into()),
1154 );
1155
1156 let (status, body) =
1157 match pylon_storage::files::FileStorage::store(&storage, &name, &payload, &ct) {
1158 Ok(stored) => (
1159 201u16,
1160 serde_json::to_string(&stored).unwrap_or_else(|_| "{}".into()),
1161 ),
1162 Err(e) => (500u16, json_error(&e.code, &e.message)),
1163 };
1164
1165 let response = with_security_headers(
1166 Response::from_string(&body)
1167 .with_status_code(status)
1168 .with_header(Header::from_bytes("Content-Type", "application/json").unwrap())
1169 .with_header(
1170 Header::from_bytes(
1171 "Access-Control-Allow-Origin",
1172 cors_origin.as_bytes().to_vec(),
1173 )
1174 .unwrap(),
1175 ),
1176 );
1177 let _ = request.respond(response);
1178 mt.record_request("POST", status);
1179 continue;
1180 }
1181
1182 const MAX_BODY_SIZE: usize = 10 * 1024 * 1024;
1192
1193 if let Some(declared) = request.body_length() {
1194 if declared > MAX_BODY_SIZE {
1195 let err_body = json_error(
1196 "PAYLOAD_TOO_LARGE",
1197 &format!("Content-Length {declared} exceeds max of {MAX_BODY_SIZE}"),
1198 );
1199 let response = with_security_headers(
1200 Response::from_string(&err_body)
1201 .with_status_code(413u16)
1202 .with_header(
1203 Header::from_bytes(
1204 "Access-Control-Allow-Origin",
1205 cors_origin.as_bytes().to_vec(),
1206 )
1207 .unwrap(),
1208 ),
1209 );
1210 let _ = request.respond(response);
1211 mt.record_request(method.as_str(), 413);
1212 continue;
1213 }
1214 }
1215
1216 let mut body = String::new();
1217 if !matches!(
1218 method,
1219 Method::Get | Method::Head | Method::Options | Method::Delete
1220 ) {
1221 use std::io::Read;
1222 let mut limited = request.as_reader().take((MAX_BODY_SIZE as u64) + 1);
1223 let _ = limited.read_to_string(&mut body);
1224 }
1225
1226 if body.len() > MAX_BODY_SIZE {
1227 let err_body = json_error(
1228 "PAYLOAD_TOO_LARGE",
1229 &format!(
1230 "Request body exceeds maximum size of {} bytes",
1231 MAX_BODY_SIZE,
1232 ),
1233 );
1234 let response = with_security_headers(
1235 Response::from_string(&err_body)
1236 .with_status_code(413u16)
1237 .with_header(Header::from_bytes("Content-Type", "application/json").unwrap())
1238 .with_header(
1239 Header::from_bytes(
1240 "Access-Control-Allow-Origin",
1241 cors_origin.as_bytes().to_vec(),
1242 )
1243 .unwrap(),
1244 ),
1245 );
1246 let _ = request.respond(response);
1247 mt.record_request(method.as_str(), 413);
1248 continue;
1249 }
1250
1251 if method == Method::Get {
1255 if let Some(rest) = url.strip_prefix("/api/shards/") {
1256 let rest = rest.split('?').next().unwrap_or(rest);
1257 if let Some(shard_id) = rest.strip_suffix("/connect") {
1258 if auth_ctx.user_id.is_none() {
1263 let err = json_error(
1264 "AUTH_REQUIRED",
1265 "Shard connect requires an authenticated session",
1266 );
1267 let response = with_security_headers(
1268 Response::from_string(&err)
1269 .with_status_code(401u16)
1270 .with_header(
1271 Header::from_bytes("Content-Type", "application/json").unwrap(),
1272 )
1273 .with_header(
1274 Header::from_bytes(
1275 "Access-Control-Allow-Origin",
1276 cors_origin.as_bytes().to_vec(),
1277 )
1278 .unwrap(),
1279 ),
1280 );
1281 let _ = request.respond(response);
1282 mt.record_request("GET", 401);
1283 continue;
1284 }
1285 let shards = match &shards_ref {
1286 Some(s) => Arc::clone(s),
1287 None => {
1288 let err = json_error(
1289 "SHARDS_NOT_AVAILABLE",
1290 "Shard system is not configured",
1291 );
1292 let response = with_security_headers(
1293 Response::from_string(&err)
1294 .with_status_code(503u16)
1295 .with_header(
1296 Header::from_bytes("Content-Type", "application/json")
1297 .unwrap(),
1298 )
1299 .with_header(
1300 Header::from_bytes(
1301 "Access-Control-Allow-Origin",
1302 cors_origin.as_bytes().to_vec(),
1303 )
1304 .unwrap(),
1305 ),
1306 );
1307 let _ = request.respond(response);
1308 mt.record_request("GET", 503);
1309 continue;
1310 }
1311 };
1312 let shard = match shards.get(shard_id) {
1313 Some(s) => s,
1314 None => {
1315 let err = json_error(
1316 "SHARD_NOT_FOUND",
1317 &format!("Shard \"{shard_id}\" not found"),
1318 );
1319 let response = with_security_headers(
1320 Response::from_string(&err)
1321 .with_status_code(404u16)
1322 .with_header(
1323 Header::from_bytes("Content-Type", "application/json")
1324 .unwrap(),
1325 )
1326 .with_header(
1327 Header::from_bytes(
1328 "Access-Control-Allow-Origin",
1329 cors_origin.as_bytes().to_vec(),
1330 )
1331 .unwrap(),
1332 ),
1333 );
1334 let _ = request.respond(response);
1335 mt.record_request("GET", 404);
1336 continue;
1337 }
1338 };
1339
1340 let sub_id = url
1343 .split("sid=")
1344 .nth(1)
1345 .and_then(|s| s.split('&').next())
1346 .map(|s| s.to_string())
1347 .or_else(|| auth_ctx.user_id.clone())
1348 .unwrap_or_else(|| {
1349 format!(
1350 "anon_{}",
1351 std::time::SystemTime::now()
1352 .duration_since(std::time::UNIX_EPOCH)
1353 .unwrap_or_default()
1354 .as_nanos()
1355 )
1356 });
1357 let subscriber_id = pylon_realtime::SubscriberId::new(sub_id);
1358
1359 let (tx, rx) = std::sync::mpsc::channel::<Vec<u8>>();
1360 let streaming_body = StreamingBody::new(rx);
1361
1362 let tx_clone = tx.clone();
1363 let sink: pylon_realtime::SnapshotSink =
1364 Box::new(move |tick: u64, bytes: &[u8]| {
1365 let mut frame = format!("id: {tick}\ndata: ").into_bytes();
1368 frame.extend_from_slice(bytes);
1369 frame.extend_from_slice(b"\n\n");
1370 let _ = tx_clone.send(frame);
1371 });
1372
1373 let shard_auth = pylon_realtime::ShardAuth {
1374 user_id: auth_ctx.user_id.clone(),
1375 is_admin: auth_ctx.is_admin,
1376 };
1377 if let Err(e) = shard.add_subscriber(subscriber_id.clone(), sink, &shard_auth) {
1378 let (status, code) = match &e {
1379 pylon_realtime::ShardError::Unauthorized(_) => (403u16, "UNAUTHORIZED"),
1380 _ => (429u16, "SUBSCRIBE_FAILED"),
1381 };
1382 let err = json_error(code, &e.to_string());
1383 let response = with_security_headers(
1384 Response::from_string(&err)
1385 .with_status_code(status)
1386 .with_header(
1387 Header::from_bytes("Content-Type", "application/json").unwrap(),
1388 )
1389 .with_header(
1390 Header::from_bytes(
1391 "Access-Control-Allow-Origin",
1392 cors_origin.as_bytes().to_vec(),
1393 )
1394 .unwrap(),
1395 ),
1396 );
1397 let _ = request.respond(response);
1398 mt.record_request("GET", status);
1399 continue;
1400 }
1401
1402 {
1405 let shard_cleanup = Arc::clone(&shard);
1406 let sub_id_cleanup = subscriber_id.clone();
1407 let tx_liveness = tx.clone();
1408 std::thread::spawn(move || {
1409 loop {
1412 std::thread::sleep(std::time::Duration::from_secs(30));
1413 if tx_liveness.send(b": heartbeat\n\n".to_vec()).is_err() {
1414 shard_cleanup.remove_subscriber(&sub_id_cleanup);
1415 return;
1416 }
1417 if !shard_cleanup.is_running() {
1418 return;
1419 }
1420 }
1421 });
1422 }
1423
1424 let response = with_security_headers(Response::new(
1425 tiny_http::StatusCode(200),
1426 vec![
1427 Header::from_bytes("Content-Type", "text/event-stream").unwrap(),
1428 Header::from_bytes("Cache-Control", "no-cache").unwrap(),
1429 Header::from_bytes("Connection", "keep-alive").unwrap(),
1430 Header::from_bytes(
1431 "Access-Control-Allow-Origin",
1432 cors_origin.as_bytes().to_vec(),
1433 )
1434 .unwrap(),
1435 ],
1436 streaming_body,
1437 None,
1438 None,
1439 ));
1440 let _ = request.respond(response);
1441 mt.record_request("GET", 200);
1442 continue;
1443 }
1444 }
1445 }
1446
1447 if method == Method::Post
1449 && url.starts_with("/api/fn/")
1450 && url != "/api/fn/traces"
1451 && request.headers().iter().any(|h| {
1452 (h.field.as_str() == "Accept" || h.field.as_str() == "accept")
1453 && h.value.as_str().contains("text/event-stream")
1454 })
1455 {
1456 let fn_name = url
1457 .strip_prefix("/api/fn/")
1458 .unwrap_or("")
1459 .split('?')
1460 .next()
1461 .unwrap_or("")
1462 .to_string();
1463
1464 if let Some(fn_ops) = &fn_ops_maybe {
1465 if pylon_router::FnOps::get_fn(fn_ops.as_ref(), &fn_name).is_none() {
1469 let err = json_error(
1470 "FN_NOT_FOUND",
1471 &format!("Function \"{fn_name}\" is not registered"),
1472 );
1473 let response = with_security_headers(
1474 Response::from_string(&err)
1475 .with_status_code(404u16)
1476 .with_header(
1477 Header::from_bytes("Content-Type", "application/json").unwrap(),
1478 )
1479 .with_header(
1480 Header::from_bytes(
1481 "Access-Control-Allow-Origin",
1482 cors_origin.as_bytes().to_vec(),
1483 )
1484 .unwrap(),
1485 ),
1486 );
1487 let _ = request.respond(response);
1488 mt.record_request("POST", 404);
1489 continue;
1490 }
1491 let identity = auth_ctx.user_id.as_deref().unwrap_or("anon");
1493 if let Err(retry_after) =
1494 pylon_router::FnOps::check_rate_limit(fn_ops.as_ref(), &fn_name, identity)
1495 {
1496 let body = format!(
1497 r#"{{"error":{{"code":"RATE_LIMITED","message":"Function \"{fn_name}\" rate limit exceeded","retry_after_secs":{retry_after}}}}}"#
1498 );
1499 let response = with_security_headers(
1500 Response::from_string(&body)
1501 .with_status_code(429u16)
1502 .with_header(
1503 Header::from_bytes("Content-Type", "application/json").unwrap(),
1504 )
1505 .with_header(
1506 Header::from_bytes(
1507 "Access-Control-Allow-Origin",
1508 cors_origin.as_bytes().to_vec(),
1509 )
1510 .unwrap(),
1511 ),
1512 );
1513 let _ = request.respond(response);
1514 mt.record_request("POST", 429);
1515 continue;
1516 }
1517
1518 let args: serde_json::Value =
1519 serde_json::from_str(&body).unwrap_or(serde_json::json!({}));
1520
1521 let auth = pylon_functions::protocol::AuthInfo {
1522 user_id: auth_ctx.user_id.clone(),
1523 is_admin: auth_ctx.is_admin,
1524 tenant_id: auth_ctx.tenant_id.clone(),
1525 };
1526
1527 let (tx, rx) = std::sync::mpsc::channel::<Vec<u8>>();
1528 let streaming_body = StreamingBody::new(rx);
1529
1530 let fn_ops_cl = Arc::clone(fn_ops);
1531 let tx_stream = tx.clone();
1532 std::thread::spawn(move || {
1533 let tx_cb = tx_stream.clone();
1534 let on_stream: Box<dyn FnMut(&str) + Send> = Box::new(move |chunk: &str| {
1535 let sse = format!("data: {}\n\n", chunk);
1536 let _ = tx_cb.send(sse.into_bytes());
1537 });
1538
1539 let result = pylon_router::FnOps::call(
1540 fn_ops_cl.as_ref(),
1541 &fn_name,
1542 args,
1543 auth,
1544 Some(on_stream),
1545 None, );
1547 match result {
1548 Ok((value, _trace)) => {
1549 let done = format!(
1550 "event: result\ndata: {}\n\n",
1551 serde_json::to_string(&value).unwrap_or_else(|_| "null".into())
1552 );
1553 let _ = tx_stream.send(done.into_bytes());
1554 }
1555 Err(e) => {
1556 let err = format!(
1557 "event: error\ndata: {}\n\n",
1558 serde_json::json!({"code": e.code, "message": e.message})
1559 );
1560 let _ = tx_stream.send(err.into_bytes());
1561 }
1562 }
1563 });
1564
1565 let response = with_security_headers(Response::new(
1566 tiny_http::StatusCode(200),
1567 vec![
1568 Header::from_bytes("Content-Type", "text/event-stream").unwrap(),
1569 Header::from_bytes("Cache-Control", "no-cache").unwrap(),
1570 Header::from_bytes("Connection", "keep-alive").unwrap(),
1571 Header::from_bytes(
1572 "Access-Control-Allow-Origin",
1573 cors_origin.as_bytes().to_vec(),
1574 )
1575 .unwrap(),
1576 ],
1577 streaming_body,
1578 None,
1579 None,
1580 ));
1581 let _ = request.respond(response);
1582 mt.record_request("POST", 200);
1583 continue;
1584 }
1585 }
1586
1587 if url == "/api/ai/stream" && method == Method::Post {
1589 if auth_ctx.user_id.is_none() {
1592 let err = json_error(
1593 "AUTH_REQUIRED",
1594 "/api/ai/stream requires an authenticated session",
1595 );
1596 let response = with_security_headers(
1597 Response::from_string(&err)
1598 .with_status_code(401u16)
1599 .with_header(
1600 Header::from_bytes("Content-Type", "application/json").unwrap(),
1601 )
1602 .with_header(
1603 Header::from_bytes(
1604 "Access-Control-Allow-Origin",
1605 cors_origin.as_bytes().to_vec(),
1606 )
1607 .unwrap(),
1608 ),
1609 );
1610 let _ = request.respond(response);
1611 mt.record_request("POST", 401);
1612 continue;
1613 }
1614 let ai_provider = std::env::var("PYLON_AI_PROVIDER").unwrap_or_default();
1615 let ai_key = std::env::var("PYLON_AI_API_KEY").unwrap_or_default();
1616 let ai_model = std::env::var("PYLON_AI_MODEL").unwrap_or_default();
1617 let ai_base = std::env::var("PYLON_AI_BASE_URL").unwrap_or_default();
1618
1619 if ai_key.is_empty() && ai_provider != "custom" {
1620 let err = json_error(
1621 "AI_NOT_CONFIGURED",
1622 "Set PYLON_AI_PROVIDER and PYLON_AI_API_KEY",
1623 );
1624 let response = with_security_headers(
1625 Response::from_string(&err)
1626 .with_status_code(503u16)
1627 .with_header(
1628 Header::from_bytes("Content-Type", "application/json").unwrap(),
1629 )
1630 .with_header(
1631 Header::from_bytes(
1632 "Access-Control-Allow-Origin",
1633 cors_origin.as_bytes().to_vec(),
1634 )
1635 .unwrap(),
1636 ),
1637 );
1638 let _ = request.respond(response);
1639 mt.record_request("POST", 503);
1640 continue;
1641 }
1642
1643 let parsed: serde_json::Value = match serde_json::from_str(&body) {
1644 Ok(v) => v,
1645 Err(_) => {
1646 let err = json_error("INVALID_JSON", "Invalid request body");
1647 let response = with_security_headers(
1648 Response::from_string(&err)
1649 .with_status_code(400u16)
1650 .with_header(
1651 Header::from_bytes("Content-Type", "application/json").unwrap(),
1652 )
1653 .with_header(
1654 Header::from_bytes(
1655 "Access-Control-Allow-Origin",
1656 cors_origin.as_bytes().to_vec(),
1657 )
1658 .unwrap(),
1659 ),
1660 );
1661 let _ = request.respond(response);
1662 mt.record_request("POST", 400);
1663 continue;
1664 }
1665 };
1666
1667 let messages: Vec<AiMessage> = match parsed.get("messages").and_then(|m| m.as_array()) {
1668 Some(arr) => arr
1669 .iter()
1670 .filter_map(|m| {
1671 let role = m.get("role")?.as_str()?.to_string();
1672 let content = m.get("content")?.as_str()?.to_string();
1673 Some(AiMessage { role, content })
1674 })
1675 .collect(),
1676 None => {
1677 let err = json_error("MISSING_FIELD", "\"messages\" array is required");
1678 let response = with_security_headers(
1679 Response::from_string(&err)
1680 .with_status_code(400u16)
1681 .with_header(
1682 Header::from_bytes("Content-Type", "application/json").unwrap(),
1683 )
1684 .with_header(
1685 Header::from_bytes(
1686 "Access-Control-Allow-Origin",
1687 cors_origin.as_bytes().to_vec(),
1688 )
1689 .unwrap(),
1690 ),
1691 );
1692 let _ = request.respond(response);
1693 mt.record_request("POST", 400);
1694 continue;
1695 }
1696 };
1697
1698 let model = parsed
1700 .get("model")
1701 .and_then(|m| m.as_str())
1702 .map(|s| s.to_string())
1703 .unwrap_or(ai_model);
1704
1705 let proxy = match ai_provider.as_str() {
1706 "anthropic" => AiProxyPlugin::anthropic(&ai_key, &model),
1707 "openai" => AiProxyPlugin::openai(&ai_key, &model),
1708 "custom" => AiProxyPlugin::custom_with_model(&ai_base, &ai_key, &model),
1709 _ => AiProxyPlugin::openai(&ai_key, &model),
1710 };
1711
1712 let (tx, rx) = std::sync::mpsc::channel::<Vec<u8>>();
1715 let streaming_body = StreamingBody::new(rx);
1716
1717 std::thread::spawn(move || {
1720 let result = proxy.stream_completion(&messages, &mut |chunk| {
1721 let sse = format!(
1722 "data: {}
1723
1724",
1725 serde_json::json!({
1726 "choices": [{"index": 0, "delta": {"content": chunk}}]
1727 })
1728 );
1729 let _ = tx.send(sse.into_bytes());
1730 });
1731
1732 match result {
1734 Ok(_) => {
1735 let _ = tx.send(
1736 b"data: [DONE]
1737
1738"
1739 .to_vec(),
1740 );
1741 }
1742 Err(e) => {
1743 let err_event = format!(
1744 "data: {}
1745
1746",
1747 serde_json::json!({"error": {"message": e, "type": "stream_error"}})
1748 );
1749 let _ = tx.send(err_event.into_bytes());
1750 }
1751 }
1752 });
1754
1755 let response = with_security_headers(Response::new(
1756 tiny_http::StatusCode(200),
1757 vec![
1758 Header::from_bytes("Content-Type", "text/event-stream").unwrap(),
1759 Header::from_bytes("Cache-Control", "no-cache").unwrap(),
1760 Header::from_bytes("Connection", "keep-alive").unwrap(),
1761 Header::from_bytes(
1762 "Access-Control-Allow-Origin",
1763 cors_origin.as_bytes().to_vec(),
1764 )
1765 .unwrap(),
1766 ],
1767 streaming_body,
1768 None, None,
1770 ));
1771 let _ = request.respond(response);
1772 mt.record_request("POST", 200);
1773 continue;
1774 }
1775
1776 let (status, response_body, content_type, is_studio, extra_headers) = if (url == "/studio"
1787 || url == "/studio/")
1788 && method == Method::Get
1789 {
1790 if !is_dev && !auth_ctx.is_admin {
1791 let body = json_error(
1792 "AUTH_REQUIRED",
1793 "/studio requires admin auth in production (set PYLON_ADMIN_TOKEN and pass it as Bearer)",
1794 );
1795 let response = with_security_headers(
1796 Response::from_string(&body)
1797 .with_status_code(401u16)
1798 .with_header(
1799 Header::from_bytes("Content-Type", "application/json").unwrap(),
1800 )
1801 .with_header(
1802 Header::from_bytes(
1803 "Access-Control-Allow-Origin",
1804 cors_origin.as_bytes().to_vec(),
1805 )
1806 .unwrap(),
1807 ),
1808 );
1809 let _ = request.respond(response);
1810 mt.record_request("GET", 401);
1811 continue;
1812 }
1813 let host = request
1820 .headers()
1821 .iter()
1822 .find(|h| h.field.equiv("Host"))
1823 .map(|h| h.value.as_str().to_string())
1824 .unwrap_or_else(|| format!("localhost:{port}"));
1825 let scheme = request
1826 .headers()
1827 .iter()
1828 .find(|h| h.field.equiv("X-Forwarded-Proto"))
1829 .map(|h| h.value.as_str().to_string())
1830 .unwrap_or_else(|| "http".to_string());
1831 let base = format!("{scheme}://{host}");
1832 let html = pylon_studio_api::generate_studio_html(rt.manifest(), &base);
1833 (
1834 200u16,
1835 html,
1836 "text/html",
1837 true,
1838 Vec::<(String, String)>::new(),
1839 )
1840 } else {
1841 let meta = pylon_plugin::RequestMeta {
1845 peer_ip: peer_ip.as_str(),
1846 };
1847 if let Err(e) = pr.run_on_request_with_meta(method.as_str(), &url, &auth_ctx, &meta) {
1848 (
1849 e.status,
1850 json_error(&e.code, &e.message),
1851 "application/json",
1852 false,
1853 Vec::new(),
1854 )
1855 } else if let Some((s, b)) =
1856 pr.try_handle_route(method.as_str(), &url, &body, &auth_ctx)
1857 {
1858 (s, b, "application/json", false, Vec::new())
1860 } else {
1861 let notifier = WsSseNotifier {
1862 ws: Arc::clone(&wh),
1863 sse: Arc::clone(&sh),
1864 };
1865 let openapi_gen = RuntimeOpenApiGenerator {
1866 manifest: rt.manifest(),
1867 };
1868 let file_ops = LocalFileOps::new_default();
1869 let cache_adapter = CacheAdapter(Arc::clone(&ca));
1870 let pubsub_adapter = PubSubAdapter(Arc::clone(&ps));
1871 let email_adapter = EmailAdapter::from_env();
1872 let fn_ops: Option<&dyn pylon_router::FnOps> =
1873 fn_ops_ref.as_deref().map(|f| f as &dyn pylon_router::FnOps);
1874 let shard_adapter = shards_ref.as_ref().map(|reg| ShardOpsAdapter {
1875 registry: Arc::clone(reg),
1876 });
1877 let shard_ops: Option<&dyn pylon_router::ShardOps> = shard_adapter
1878 .as_ref()
1879 .map(|a| a as &dyn pylon_router::ShardOps);
1880 let plugin_hooks = PluginHooksAdapter(Arc::clone(&pr));
1881 let request_headers: Vec<(String, String)> = request
1886 .headers()
1887 .iter()
1888 .map(|h| (h.field.as_str().to_string(), h.value.as_str().to_string()))
1889 .collect();
1890 let router_ctx = pylon_router::RouterContext {
1891 store: rt.as_ref(),
1892 session_store: &ss,
1893 magic_codes: &mc,
1894 oauth_state: &os,
1895 policy_engine: &pe,
1896 change_log: &cl,
1897 notifier: ¬ifier,
1898 rooms: rm.as_ref(),
1899 cache: &cache_adapter,
1900 pubsub: &pubsub_adapter,
1901 jobs: jq.as_ref(),
1902 scheduler: sc.as_ref(),
1903 workflows: we.as_ref(),
1904 files: &file_ops,
1905 openapi: &openapi_gen,
1906 functions: fn_ops,
1907 email: &email_adapter,
1908 shards: shard_ops,
1909 plugin_hooks: &plugin_hooks,
1910 auth_ctx: &auth_ctx,
1911 is_dev,
1912 request_headers: &request_headers,
1913 peer_ip: peer_ip.as_str(),
1914 cookie_config: cookie_config.as_ref(),
1915 response_headers: std::cell::RefCell::new(Vec::new()),
1916 };
1917 let http_method = HttpMethod::from_str(method.as_str());
1918 let (s, b, _ct) = pylon_router::route(
1919 &router_ctx,
1920 http_method,
1921 &url,
1922 &body,
1923 auth_token.as_deref(),
1924 );
1925 let extra_headers = router_ctx.take_response_headers();
1926 (s, b, "application/json", false, extra_headers)
1927 }
1928 };
1929
1930 let mut response = Response::from_string(&response_body)
1931 .with_status_code(status)
1932 .with_header(Header::from_bytes("Content-Type", content_type).unwrap())
1933 .with_header(
1934 Header::from_bytes(
1935 "Access-Control-Allow-Origin",
1936 cors_origin.as_bytes().to_vec(),
1937 )
1938 .unwrap(),
1939 )
1940 .with_header(
1941 Header::from_bytes(
1942 "Access-Control-Allow-Methods",
1943 "GET, POST, PATCH, DELETE, OPTIONS",
1944 )
1945 .unwrap(),
1946 )
1947 .with_header(
1948 Header::from_bytes(
1949 "Access-Control-Allow-Headers",
1950 "Content-Type, Authorization",
1951 )
1952 .unwrap(),
1953 );
1954 if allow_credentials {
1959 response = response
1960 .with_header(
1961 Header::from_bytes("Access-Control-Allow-Credentials", "true").unwrap(),
1962 )
1963 .with_header(Header::from_bytes("Vary", "Origin").unwrap());
1964 }
1965
1966 for (name, value) in extra_headers {
1973 if let Ok(h) = Header::from_bytes(name.as_bytes(), value.as_bytes().to_vec()) {
1974 response = response.with_header(h);
1975 }
1976 }
1977
1978 if is_studio {
1991 response = response.with_header(
1992 Header::from_bytes(
1993 "Content-Security-Policy",
1994 "default-src 'self' 'unsafe-inline' 'unsafe-eval' https://cdn.tailwindcss.com https://unpkg.com ws: wss:",
1995 ).unwrap(),
1996 );
1997 }
1998
1999 let response = with_security_headers(response);
2000
2001 let _ = request.respond(response);
2002 mt.record_request(method.as_str(), status);
2003 }
2004
2005 tracing::warn!("Shutting down gracefully...");
2006
2007 let drain_timeout = std::time::Duration::from_secs(
2010 std::env::var("PYLON_DRAIN_SECS")
2011 .ok()
2012 .and_then(|s| s.parse().ok())
2013 .unwrap_or(10),
2014 );
2015 let start = Instant::now();
2016
2017 if let Some(reg) = &shard_registry {
2019 for id in reg.ids() {
2020 if let Some(shard) = reg.get(&id) {
2021 shard.stop();
2022 }
2023 }
2024 }
2025
2026 let _ = &scheduler; while start.elapsed() < drain_timeout {
2031 let pending_jobs = job_queue.stats().pending;
2032 if pending_jobs == 0 {
2033 break;
2034 }
2035 std::thread::sleep(std::time::Duration::from_millis(100));
2036 }
2037
2038 let elapsed = start.elapsed();
2039 tracing::warn!(
2040 "Drain complete in {:.1}s (timeout {}s)",
2041 elapsed.as_secs_f32(),
2042 drain_timeout.as_secs()
2043 );
2044 Ok(())
2045}
2046
2047fn json_error(code: &str, message: &str) -> String {
2052 pylon_router::json_error(code, message)
2053}
2054
2055fn build_session_store(app_db_path: Option<&str>) -> SessionStore {
2065 if std::env::var("PYLON_SESSION_IN_MEMORY")
2066 .map(|v| v == "1" || v == "true")
2067 .unwrap_or(false)
2068 {
2069 return SessionStore::new();
2070 }
2071 let explicit = std::env::var("PYLON_SESSION_DB").ok();
2072 let default_path = app_db_path.map(|p| format!("{p}.sessions.db"));
2073 let path = match explicit.or(default_path) {
2074 Some(p) => p,
2075 None => return SessionStore::new(),
2076 };
2077 match crate::session_backend::SqliteSessionBackend::open(&path) {
2078 Ok(backend) => {
2079 tracing::info!("[pylon] Session persistence enabled: {path}");
2080 SessionStore::with_backend(Box::new(backend))
2081 }
2082 Err(e) => {
2083 tracing::warn!(
2084 "[pylon] could not open session DB {path}: {e}. Falling back to in-memory sessions."
2085 );
2086 SessionStore::new()
2087 }
2088 }
2089}
2090
2091fn parse_multipart_first_file(
2102 body: &[u8],
2103 content_type_header: &str,
2104) -> Option<(String, String, Vec<u8>)> {
2105 let boundary_param = content_type_header
2107 .split(';')
2108 .find_map(|p| p.trim().strip_prefix("boundary="))?;
2109 let boundary = boundary_param.trim_matches('"');
2110 let delimiter = format!("--{boundary}");
2111 let delimiter_bytes = delimiter.as_bytes();
2112
2113 let mut pos = 0usize;
2115 while pos < body.len() {
2116 let next = find_subslice(&body[pos..], delimiter_bytes)?;
2118 let part_start = pos + next + delimiter_bytes.len();
2119 if part_start + 2 > body.len() {
2121 return None;
2122 }
2123 if &body[part_start..part_start + 2] == b"--" {
2124 return None; }
2126 let header_start = part_start + skip_crlf(&body[part_start..]);
2127
2128 let header_end_offset = find_subslice(&body[header_start..], b"\r\n\r\n")?;
2130 let headers = &body[header_start..header_start + header_end_offset];
2131 let data_start = header_start + header_end_offset + 4;
2132
2133 let next_delim_offset = find_subslice(&body[data_start..], delimiter_bytes)?;
2135 let mut data_end = data_start + next_delim_offset;
2137 if data_end >= 2 && &body[data_end - 2..data_end] == b"\r\n" {
2138 data_end -= 2;
2139 }
2140
2141 let headers_str = std::str::from_utf8(headers).ok()?;
2143 let mut filename: Option<String> = None;
2144 let mut part_ct = String::from("application/octet-stream");
2145 let mut has_file = false;
2146 for line in headers_str.split("\r\n") {
2147 let lower = line.to_ascii_lowercase();
2148 if let Some(rest) = lower.strip_prefix("content-disposition:") {
2149 if rest.contains("filename=") {
2150 has_file = true;
2151 if let Some(start) = line.find("filename=\"") {
2153 let from = start + 10;
2154 if let Some(end_offset) = line[from..].find('"') {
2155 filename = Some(line[from..from + end_offset].to_string());
2156 }
2157 }
2158 }
2159 } else if let Some(rest) = lower.strip_prefix("content-type:") {
2160 part_ct = rest.trim().to_string();
2161 }
2162 }
2163
2164 if has_file {
2165 let name = filename.unwrap_or_else(|| "upload".into());
2166 return Some((name, part_ct, body[data_start..data_end].to_vec()));
2167 }
2168
2169 pos = data_end;
2170 }
2171 None
2172}
2173
2174fn find_subslice(haystack: &[u8], needle: &[u8]) -> Option<usize> {
2175 if needle.is_empty() || needle.len() > haystack.len() {
2176 return None;
2177 }
2178 haystack.windows(needle.len()).position(|w| w == needle)
2179}
2180
2181fn skip_crlf(buf: &[u8]) -> usize {
2182 if buf.len() >= 2 && &buf[0..2] == b"\r\n" {
2183 2
2184 } else if !buf.is_empty() && buf[0] == b'\n' {
2185 1
2186 } else {
2187 0
2188 }
2189}
2190
2191#[cfg(test)]
2192mod multipart_tests {
2193 use super::*;
2194
2195 #[test]
2196 fn parses_single_file() {
2197 let body = b"--bnd\r\n\
2198Content-Disposition: form-data; name=\"file\"; filename=\"hello.txt\"\r\n\
2199Content-Type: text/plain\r\n\
2200\r\n\
2201Hello world\r\n\
2202--bnd--\r\n";
2203 let ct = "multipart/form-data; boundary=bnd";
2204 let (name, content_type, bytes) = parse_multipart_first_file(body, ct).unwrap();
2205 assert_eq!(name, "hello.txt");
2206 assert_eq!(content_type, "text/plain");
2207 assert_eq!(bytes, b"Hello world");
2208 }
2209
2210 #[test]
2211 fn returns_none_without_file_part() {
2212 let body = b"--bnd\r\n\
2213Content-Disposition: form-data; name=\"field\"\r\n\
2214\r\n\
2215just text\r\n\
2216--bnd--\r\n";
2217 let ct = "multipart/form-data; boundary=bnd";
2218 assert!(parse_multipart_first_file(body, ct).is_none());
2219 }
2220
2221 #[test]
2222 fn returns_none_when_no_boundary() {
2223 assert!(parse_multipart_first_file(b"anything", "application/json").is_none());
2224 }
2225}