1use axum::{
8 body::Body,
9 extract::{ConnectInfo, Query, State},
10 http::StatusCode,
11 response::{IntoResponse, Sse},
12 routing::{get, post},
13 Json, Router,
14};
15use futures_util::StreamExt;
16use serde::Deserialize;
17use std::{
18 collections::{HashMap, HashSet},
19 io::ErrorKind,
20 net::SocketAddr,
21 path::{Path, PathBuf},
22 sync::Arc,
23};
24use tokio::{
25 fs::{File, OpenOptions},
26 io::AsyncWriteExt,
27 sync::{broadcast, oneshot, RwLock},
28};
29use tokio_stream::wrappers::BroadcastStream;
30use uuid::Uuid;
31
32use crate::config::EngineConfig;
33use crate::error::{EngineError, EngineResult};
34use crate::events::EventHandler;
35use crate::history::HistoryPersistence;
36use crate::protocol::{
37 EngineEvent, PendingTransfer, TransferApprovalStatus, TransferDecision, TransferProgress,
38 TransferRequest, TransferResponse,
39};
40use crate::types::{TransferDirection, TransferRecord, TransferStatus};
41
42pub struct ServerState {
44 pub config: RwLock<EngineConfig>,
46 pub pending_transfers: RwLock<HashMap<String, PendingTransfer>>,
48 pub approved_tokens: RwLock<HashMap<String, String>>,
50 pub rejected_transfers: RwLock<HashMap<String, String>>,
52 pub cancelled_transfers: RwLock<HashSet<String>>,
54 pub received_files: RwLock<HashMap<String, HashSet<String>>>,
56 pub transfer_bytes: RwLock<HashMap<String, u64>>,
58 pub transfer_start_times: RwLock<HashMap<String, std::time::Instant>>,
60 internal_event_tx: broadcast::Sender<InternalEvent>,
62 event_handler: Arc<dyn EventHandler>,
64 history: Option<Arc<dyn HistoryPersistence>>,
66}
67
68#[derive(Debug, Clone, serde::Serialize)]
70#[serde(tag = "type", rename_all = "camelCase")]
71enum InternalEvent {
72 TransferRequest {
73 transfer: PendingTransfer,
74 },
75 Progress {
76 progress: TransferProgress,
77 },
78 TransferComplete {
79 #[serde(rename = "transferId")]
80 transfer_id: String,
81 },
82 TransferFailed {
83 #[serde(rename = "transferId")]
84 transfer_id: String,
85 error: String,
86 },
87 PortChanged {
88 #[serde(rename = "oldPort")]
89 old_port: u16,
90 #[serde(rename = "newPort")]
91 new_port: u16,
92 },
93}
94
95impl ServerState {
96 pub fn new(config: EngineConfig, event_handler: Arc<dyn EventHandler>) -> Self {
98 let (internal_event_tx, _) = broadcast::channel(100);
99
100 Self {
101 config: RwLock::new(config),
102 pending_transfers: RwLock::new(HashMap::new()),
103 approved_tokens: RwLock::new(HashMap::new()),
104 rejected_transfers: RwLock::new(HashMap::new()),
105 cancelled_transfers: RwLock::new(HashSet::new()),
106 received_files: RwLock::new(HashMap::new()),
107 transfer_bytes: RwLock::new(HashMap::new()),
108 transfer_start_times: RwLock::new(HashMap::new()),
109 internal_event_tx,
110 event_handler,
111 history: None,
112 }
113 }
114
115 pub fn new_with_history(
117 config: EngineConfig,
118 event_handler: Arc<dyn EventHandler>,
119 history: Arc<dyn HistoryPersistence>,
120 ) -> Self {
121 let (internal_event_tx, _) = broadcast::channel(100);
122
123 Self {
124 config: RwLock::new(config),
125 pending_transfers: RwLock::new(HashMap::new()),
126 approved_tokens: RwLock::new(HashMap::new()),
127 rejected_transfers: RwLock::new(HashMap::new()),
128 cancelled_transfers: RwLock::new(HashSet::new()),
129 received_files: RwLock::new(HashMap::new()),
130 transfer_bytes: RwLock::new(HashMap::new()),
131 transfer_start_times: RwLock::new(HashMap::new()),
132 internal_event_tx,
133 event_handler,
134 history: Some(history),
135 }
136 }
137
138 fn emit_event(&self, event: EngineEvent) {
140 self.event_handler.on_event(event.clone());
142
143 let internal = match event {
145 EngineEvent::TransferRequest(transfer) => InternalEvent::TransferRequest { transfer },
146 EngineEvent::TransferProgress(progress) => InternalEvent::Progress { progress },
147 EngineEvent::TransferComplete { transfer_id } => {
148 InternalEvent::TransferComplete { transfer_id }
149 }
150 EngineEvent::TransferFailed { transfer_id, error } => {
151 InternalEvent::TransferFailed { transfer_id, error }
152 }
153 EngineEvent::PortChanged { old_port, new_port } => {
154 InternalEvent::PortChanged { old_port, new_port }
155 }
156 _ => return, };
158 let _ = self.internal_event_tx.send(internal);
159 }
160
161 fn record_receive_history(
163 &self,
164 transfer: &PendingTransfer,
165 status: TransferStatus,
166 bytes_transferred: u64,
167 error: Option<String>,
168 ) {
169 if let Some(ref history) = self.history {
170 let record = TransferRecord {
171 id: transfer.id.clone(),
172 direction: TransferDirection::Received,
173 status,
174 peer_address: transfer.source_ip.clone(),
175 files: transfer.files.clone(),
176 total_size: transfer.total_size,
177 bytes_transferred,
178 started_at: transfer.received_at,
179 completed_at: Some(chrono::Utc::now()),
180 error,
181 };
182 if let Err(e) = history.add(record) {
183 tracing::warn!("Failed to record transfer history: {}", e);
184 }
185 }
186 }
187
188 pub async fn accept_transfer(&self, transfer_id: &str) -> EngineResult<String> {
190 let pending = self.pending_transfers.read().await;
192 if !pending.contains_key(transfer_id) {
193 return Err(EngineError::TransferNotFound(transfer_id.to_string()));
194 }
195 drop(pending);
196
197 let token = Uuid::new_v4().to_string();
199 self.approved_tokens
200 .write()
201 .await
202 .insert(transfer_id.to_string(), token.clone());
203 self.rejected_transfers.write().await.remove(transfer_id);
204
205 Ok(token)
206 }
207
208 pub async fn reject_transfer(&self, transfer_id: &str) -> EngineResult<()> {
210 let pending = self.pending_transfers.read().await;
212 if !pending.contains_key(transfer_id) {
213 return Err(EngineError::TransferNotFound(transfer_id.to_string()));
214 }
215 drop(pending);
216
217 self.rejected_transfers
219 .write()
220 .await
221 .insert(transfer_id.to_string(), "Rejected by user".to_string());
222 self.approved_tokens.write().await.remove(transfer_id);
223
224 Ok(())
225 }
226
227 pub async fn cancel_transfer(&self, transfer_id: &str) -> EngineResult<()> {
231 let pending = self.pending_transfers.read().await;
233 let approved = self.approved_tokens.read().await;
234 if !pending.contains_key(transfer_id) && !approved.contains_key(transfer_id) {
235 return Err(EngineError::TransferNotFound(transfer_id.to_string()));
236 }
237 let transfer_info = pending.get(transfer_id).cloned();
238 drop(pending);
239 drop(approved);
240
241 let bytes_transferred = *self
243 .transfer_bytes
244 .read()
245 .await
246 .get(transfer_id)
247 .unwrap_or(&0);
248
249 self.cancelled_transfers
251 .write()
252 .await
253 .insert(transfer_id.to_string());
254
255 self.pending_transfers.write().await.remove(transfer_id);
257 self.approved_tokens.write().await.remove(transfer_id);
258 self.received_files.write().await.remove(transfer_id);
259 self.transfer_bytes.write().await.remove(transfer_id);
260 self.transfer_start_times.write().await.remove(transfer_id);
261
262 self.emit_event(EngineEvent::TransferFailed {
264 transfer_id: transfer_id.to_string(),
265 error: "Transfer cancelled".to_string(),
266 });
267
268 if let Some(transfer) = transfer_info {
270 self.record_receive_history(
271 &transfer,
272 TransferStatus::Failed,
273 bytes_transferred,
274 Some("Transfer cancelled".to_string()),
275 );
276 }
277
278 Ok(())
279 }
280
281 pub async fn is_transfer_cancelled(&self, transfer_id: &str) -> bool {
283 self.cancelled_transfers.read().await.contains(transfer_id)
284 }
285
286 pub async fn get_pending_transfers(&self) -> Vec<PendingTransfer> {
288 self.pending_transfers
289 .read()
290 .await
291 .values()
292 .cloned()
293 .collect()
294 }
295
296 pub async fn update_config(&self, config: EngineConfig) {
298 *self.config.write().await = config;
299 }
300}
301
302pub struct ServerHandle {
304 shutdown_tx: oneshot::Sender<()>,
305}
306
307impl ServerHandle {
308 pub fn shutdown(self) {
310 let _ = self.shutdown_tx.send(());
311 }
312}
313
314#[derive(Debug, Deserialize)]
316pub struct ChunkParams {
317 transfer_id: String,
318 file_id: String,
319 token: String,
320}
321
322#[derive(Debug, Deserialize)]
323pub struct TransferStatusParams {
324 transfer_id: String,
325}
326
327pub fn create_router(state: Arc<ServerState>) -> Router {
329 Router::new()
330 .route("/health", get(health_handler))
332 .route("/info", get(info_handler))
334 .route("/transfer", post(transfer_request_handler))
336 .route("/transfer/status", get(transfer_status_handler))
338 .route("/chunk", post(chunk_upload_handler))
340 .route("/events", get(events_handler))
342 .with_state(state)
343}
344
345fn sanitize_file_name(name: &str, fallback: &str) -> String {
346 let trimmed = name.trim();
347 let file_name = Path::new(trimmed)
348 .file_name()
349 .and_then(|n| n.to_str())
350 .map(|n| n.trim())
351 .filter(|n| !n.is_empty() && *n != "." && *n != "..");
352
353 file_name
354 .map(|n| n.to_string())
355 .unwrap_or_else(|| fallback.to_string())
356}
357
358fn split_file_name(name: &str) -> (&str, &str) {
359 if let Some((stem, ext)) = name.rsplit_once('.') {
360 if !stem.is_empty() {
361 return (stem, ext);
362 }
363 }
364 (name, "")
365}
366
367fn sanitize_relative_path(path: &str) -> PathBuf {
369 let mut result = PathBuf::new();
370
371 for component in Path::new(path).components() {
372 if let std::path::Component::Normal(name) = component {
375 if let Some(name_str) = name.to_str() {
376 let safe_name = sanitize_file_name(name_str, "file");
377 result.push(safe_name);
378 }
379 }
380 }
381
382 result
383}
384
385async fn open_unique_file(
386 download_dir: &Path,
387 base_name: &str,
388) -> Result<(PathBuf, File), std::io::Error> {
389 let (stem, ext) = split_file_name(base_name);
390
391 for index in 0..1000 {
392 let candidate = if index == 0 {
393 base_name.to_string()
394 } else if ext.is_empty() {
395 format!("{} ({})", stem, index)
396 } else {
397 format!("{} ({}).{}", stem, index, ext)
398 };
399
400 let path = download_dir.join(&candidate);
401 match OpenOptions::new()
402 .write(true)
403 .create_new(true)
404 .open(&path)
405 .await
406 {
407 Ok(file) => return Ok((path, file)),
408 Err(e) if e.kind() == ErrorKind::AlreadyExists => continue,
409 Err(e) => return Err(e),
410 }
411 }
412
413 Err(std::io::Error::new(
414 ErrorKind::AlreadyExists,
415 "Too many filename conflicts",
416 ))
417}
418
419async fn health_handler() -> impl IntoResponse {
421 Json(serde_json::json!({
422 "status": "ok",
423 "app": "gosh-lan-transfer",
424 "version": env!("CARGO_PKG_VERSION")
425 }))
426}
427
428async fn info_handler(State(state): State<Arc<ServerState>>) -> impl IntoResponse {
430 let config = state.config.read().await;
431
432 Json(serde_json::json!({
433 "name": config.device_name,
434 "version": env!("CARGO_PKG_VERSION"),
435 "app": "gosh-lan-transfer"
436 }))
437}
438
439async fn transfer_request_handler(
441 State(state): State<Arc<ServerState>>,
442 ConnectInfo(addr): ConnectInfo<SocketAddr>,
443 Json(request): Json<TransferRequest>,
444) -> impl IntoResponse {
445 let computed_total: u64 = request.files.iter().map(|f| f.size).sum();
446
447 if computed_total != request.total_size {
448 tracing::warn!(
449 "Transfer total mismatch for {}: client {}, computed {}",
450 request.transfer_id,
451 request.total_size,
452 computed_total
453 );
454 }
455
456 tracing::info!(
457 "Received transfer request: {} files, {} bytes",
458 request.files.len(),
459 computed_total
460 );
461
462 let source_ip = addr.ip().to_string();
463
464 let pending = PendingTransfer {
466 id: request.transfer_id.clone(),
467 source_ip: source_ip.clone(),
468 sender_name: request.sender_name.clone(),
469 files: request.files.clone(),
470 total_size: computed_total,
471 received_at: chrono::Utc::now(),
472 };
473
474 let config = state.config.read().await;
476 let is_trusted = config.trusted_hosts.iter().any(|host| host == &source_ip);
477
478 state
479 .pending_transfers
480 .write()
481 .await
482 .insert(request.transfer_id.clone(), pending.clone());
483 state
484 .rejected_transfers
485 .write()
486 .await
487 .remove(&request.transfer_id);
488
489 if is_trusted {
490 let token = Uuid::new_v4().to_string();
492 state
493 .approved_tokens
494 .write()
495 .await
496 .insert(request.transfer_id.clone(), token.clone());
497
498 state
499 .rejected_transfers
500 .write()
501 .await
502 .remove(&request.transfer_id);
503
504 return Json(TransferResponse {
505 accepted: true,
506 message: Some("Auto-accepted from trusted host".to_string()),
507 token: Some(token),
508 });
509 }
510
511 state.emit_event(EngineEvent::TransferRequest(pending));
513
514 Json(TransferResponse {
516 accepted: false,
517 message: Some("Awaiting user approval".to_string()),
518 token: None,
519 })
520}
521
522async fn transfer_status_handler(
524 State(state): State<Arc<ServerState>>,
525 Query(params): Query<TransferStatusParams>,
526) -> impl IntoResponse {
527 let approved = state.approved_tokens.read().await;
528 if let Some(token) = approved.get(¶ms.transfer_id) {
529 return Json(TransferApprovalStatus {
530 status: TransferDecision::Accepted,
531 token: Some(token.clone()),
532 message: Some("Accepted".to_string()),
533 });
534 }
535 drop(approved);
536
537 let rejected = state.rejected_transfers.read().await;
538 if let Some(reason) = rejected.get(¶ms.transfer_id) {
539 return Json(TransferApprovalStatus {
540 status: TransferDecision::Rejected,
541 token: None,
542 message: Some(reason.clone()),
543 });
544 }
545 drop(rejected);
546
547 let pending = state.pending_transfers.read().await;
548 if pending.contains_key(¶ms.transfer_id) {
549 return Json(TransferApprovalStatus {
550 status: TransferDecision::Pending,
551 token: None,
552 message: Some("Awaiting user approval".to_string()),
553 });
554 }
555
556 Json(TransferApprovalStatus {
557 status: TransferDecision::NotFound,
558 token: None,
559 message: Some("Transfer not found".to_string()),
560 })
561}
562
563async fn chunk_upload_handler(
565 State(state): State<Arc<ServerState>>,
566 Query(params): Query<ChunkParams>,
567 body: Body,
568) -> impl IntoResponse {
569 if state.is_transfer_cancelled(¶ms.transfer_id).await {
571 return (
572 StatusCode::GONE,
573 Json(serde_json::json!({"error": "Transfer was cancelled"})),
574 );
575 }
576
577 let approved = state.approved_tokens.read().await;
579 let expected_token = approved.get(¶ms.transfer_id);
580
581 if expected_token != Some(¶ms.token) {
582 return (
583 StatusCode::UNAUTHORIZED,
584 Json(serde_json::json!({"error": "Invalid or expired token"})),
585 );
586 }
587 drop(approved);
588
589 let download_dir = state.config.read().await.download_dir.clone();
591 if let Err(e) = tokio::fs::create_dir_all(&download_dir).await {
592 tracing::error!("Failed to create download directory: {}", e);
593 return (
594 StatusCode::INTERNAL_SERVER_ERROR,
595 Json(
596 serde_json::json!({"error": format!("Failed to create download directory: {}", e)}),
597 ),
598 );
599 }
600
601 let pending = state.pending_transfers.read().await;
603 let transfer = match pending.get(¶ms.transfer_id) {
604 Some(t) => t.clone(),
605 None => {
606 return (
607 StatusCode::NOT_FOUND,
608 Json(serde_json::json!({"error": "Transfer not found"})),
609 );
610 }
611 };
612 drop(pending);
613
614 let file_info = match transfer.files.iter().find(|f| f.id == params.file_id) {
615 Some(f) => f.clone(),
616 None => {
617 return (
618 StatusCode::NOT_FOUND,
619 Json(serde_json::json!({"error": "File not found in transfer"})),
620 );
621 }
622 };
623
624 let (file_path, mut file) = if let Some(ref relative_path) = file_info.relative_path {
626 let sanitized_relative = sanitize_relative_path(relative_path);
628 let target_path = download_dir.join(&sanitized_relative);
629
630 if let Some(parent) = target_path.parent() {
632 if let Err(e) = tokio::fs::create_dir_all(parent).await {
633 tracing::error!("Failed to create directory structure: {}", e);
634 return (
635 StatusCode::INTERNAL_SERVER_ERROR,
636 Json(
637 serde_json::json!({"error": format!("Failed to create directories: {}", e)}),
638 ),
639 );
640 }
641 }
642
643 let base_name = target_path
645 .file_name()
646 .and_then(|n| n.to_str())
647 .unwrap_or(&file_info.id);
648 let parent_dir = target_path.parent().unwrap_or(&download_dir);
649
650 match open_unique_file(parent_dir, base_name).await {
651 Ok(result) => result,
652 Err(e) => {
653 tracing::error!("Failed to create file: {}", e);
654 return (
655 StatusCode::INTERNAL_SERVER_ERROR,
656 Json(serde_json::json!({"error": format!("Failed to create file: {}", e)})),
657 );
658 }
659 }
660 } else {
661 let safe_name = sanitize_file_name(&file_info.name, &file_info.id);
663 match open_unique_file(&download_dir, &safe_name).await {
664 Ok(result) => result,
665 Err(e) => {
666 tracing::error!("Failed to create file: {}", e);
667 return (
668 StatusCode::INTERNAL_SERVER_ERROR,
669 Json(serde_json::json!({"error": format!("Failed to create file: {}", e)})),
670 );
671 }
672 }
673 };
674
675 let stored_name = file_path
676 .file_name()
677 .and_then(|n| n.to_str())
678 .unwrap_or(&file_info.id)
679 .to_string();
680
681 {
683 let mut start_times = state.transfer_start_times.write().await;
684 start_times
685 .entry(params.transfer_id.clone())
686 .or_insert_with(std::time::Instant::now);
687 }
688
689 let mut bytes_received: u64 = 0;
691 let mut stream = body.into_data_stream();
692 let mut last_progress_bytes: u64 = 0;
693
694 while let Some(chunk) = stream.next().await {
695 match chunk {
696 Ok(data) => {
697 let next_size = bytes_received + data.len() as u64;
698 if next_size > file_info.size {
699 tracing::error!("Received more data than expected for {}", file_info.name);
700 drop(file);
701 let _ = tokio::fs::remove_file(&file_path).await;
702 return (
703 StatusCode::PAYLOAD_TOO_LARGE,
704 Json(serde_json::json!({"error": "Received more data than expected"})),
705 );
706 }
707
708 if let Err(e) = file.write_all(&data).await {
709 tracing::error!("Failed to write chunk: {}", e);
710 return (
711 StatusCode::INTERNAL_SERVER_ERROR,
712 Json(serde_json::json!({"error": format!("Failed to write: {}", e)})),
713 );
714 }
715
716 bytes_received = next_size;
717
718 {
720 let mut transfer_bytes = state.transfer_bytes.write().await;
721 let total = transfer_bytes
722 .entry(params.transfer_id.clone())
723 .or_insert(0);
724 *total += data.len() as u64;
725 }
726
727 if bytes_received - last_progress_bytes >= 32768 || bytes_received == file_info.size
729 {
730 last_progress_bytes = bytes_received;
731
732 let speed_bps = {
734 let start_times = state.transfer_start_times.read().await;
735 let transfer_bytes = state.transfer_bytes.read().await;
736 if let (Some(start_time), Some(&total_bytes)) = (
737 start_times.get(¶ms.transfer_id),
738 transfer_bytes.get(¶ms.transfer_id),
739 ) {
740 let elapsed_secs = start_time.elapsed().as_secs_f64();
741 if elapsed_secs > 0.0 {
742 (total_bytes as f64 / elapsed_secs) as u64
743 } else {
744 0
745 }
746 } else {
747 0
748 }
749 };
750
751 state.emit_event(EngineEvent::TransferProgress(TransferProgress {
753 transfer_id: params.transfer_id.clone(),
754 current_file: Some(stored_name.clone()),
755 bytes_transferred: bytes_received,
756 total_bytes: file_info.size,
757 speed_bps,
758 }));
759 }
760 }
761 Err(e) => {
762 tracing::error!("Error reading chunk: {}", e);
763 return (
764 StatusCode::INTERNAL_SERVER_ERROR,
765 Json(serde_json::json!({"error": format!("Stream error: {}", e)})),
766 );
767 }
768 }
769 }
770
771 if let Err(e) = file.flush().await {
773 tracing::error!("Failed to flush file: {}", e);
774 }
775
776 if bytes_received != file_info.size {
777 tracing::warn!(
778 "Size mismatch for {}: expected {}, received {}",
779 file_info.name,
780 file_info.size,
781 bytes_received
782 );
783 let _ = tokio::fs::remove_file(&file_path).await;
784 return (
785 StatusCode::BAD_REQUEST,
786 Json(serde_json::json!({"error": "Incomplete file received"})),
787 );
788 }
789
790 tracing::info!(
791 "File received: {} ({} bytes)",
792 file_path.display(),
793 bytes_received
794 );
795
796 let transfer_id = params.transfer_id.clone();
798 let file_id = params.file_id.clone();
799
800 {
802 let mut received = state.received_files.write().await;
803 received
804 .entry(transfer_id.clone())
805 .or_insert_with(HashSet::new)
806 .insert(file_id);
807 }
808
809 let pending = state.pending_transfers.read().await;
811 if let Some(transfer) = pending.get(&transfer_id) {
812 let expected_count = transfer.files.len();
813 let received = state.received_files.read().await;
814 let received_count = received.get(&transfer_id).map(|s| s.len()).unwrap_or(0);
815
816 if received_count >= expected_count {
817 tracing::info!(
818 "Transfer {} complete: all {} files received",
819 transfer_id,
820 expected_count
821 );
822
823 let transfer_clone = transfer.clone();
825 let total_bytes = *state
826 .transfer_bytes
827 .read()
828 .await
829 .get(&transfer_id)
830 .unwrap_or(&transfer.total_size);
831
832 state.emit_event(EngineEvent::TransferComplete {
834 transfer_id: transfer_id.clone(),
835 });
836
837 state.record_receive_history(
839 &transfer_clone,
840 TransferStatus::Completed,
841 total_bytes,
842 None,
843 );
844
845 drop(pending);
847 drop(received);
848
849 state.pending_transfers.write().await.remove(&transfer_id);
851 state.approved_tokens.write().await.remove(&transfer_id);
852 state.received_files.write().await.remove(&transfer_id);
853 state.transfer_bytes.write().await.remove(&transfer_id);
854 state
855 .transfer_start_times
856 .write()
857 .await
858 .remove(&transfer_id);
859 }
860 }
861
862 (
863 StatusCode::OK,
864 Json(serde_json::json!({
865 "status": "ok",
866 "file": stored_name,
867 "bytes_received": bytes_received
868 })),
869 )
870}
871
872async fn events_handler(
874 State(state): State<Arc<ServerState>>,
875) -> Sse<
876 impl futures_util::Stream<Item = Result<axum::response::sse::Event, std::convert::Infallible>>,
877> {
878 let rx = state.internal_event_tx.subscribe();
879
880 let stream = BroadcastStream::new(rx).map(|result: Result<InternalEvent, _>| {
881 let event: InternalEvent = match result {
882 Ok(event) => event,
883 Err(_) => {
884 return Ok::<_, std::convert::Infallible>(
885 axum::response::sse::Event::default().data("heartbeat"),
886 )
887 }
888 };
889
890 let data = serde_json::to_string(&event).unwrap_or_default();
891 Ok(axum::response::sse::Event::default().data(data))
892 });
893
894 Sse::new(stream)
895}
896
897pub async fn start_server(state: Arc<ServerState>, port: u16) -> EngineResult<ServerHandle> {
899 let app = create_router(state.clone());
900
901 tracing::info!("Starting server on port {}", port);
902
903 let addr_v6 = SocketAddr::from((std::net::Ipv6Addr::UNSPECIFIED, port));
906 let addr_v4 = SocketAddr::from(([0, 0, 0, 0], port));
907
908 let listener = match tokio::net::TcpListener::bind(addr_v6).await {
909 Ok(l) => {
910 tracing::info!("Bound to IPv6 wildcard [::]:{} (dual-stack)", port);
911 l
912 }
913 Err(e) => {
914 tracing::debug!("IPv6 bind failed ({}), falling back to IPv4", e);
915 tokio::net::TcpListener::bind(addr_v4).await.map_err(|e| {
916 EngineError::Network(format!("Failed to bind to port {}: {}", port, e))
917 })?
918 }
919 };
920
921 let (shutdown_tx, shutdown_rx) = oneshot::channel();
922
923 tokio::spawn(async move {
925 axum::serve(
926 listener,
927 app.into_make_service_with_connect_info::<SocketAddr>(),
928 )
929 .with_graceful_shutdown(async {
930 let _ = shutdown_rx.await;
931 })
932 .await
933 .ok();
934 });
935
936 state
938 .event_handler
939 .on_event(EngineEvent::ServerStarted { port });
940
941 Ok(ServerHandle { shutdown_tx })
942}