1pub mod client;
53pub mod config;
54pub mod error;
55pub mod events;
56pub mod favorites;
57pub mod history;
58pub mod protocol;
59pub mod server;
60pub mod types;
61
62pub use protocol::{
64 EngineEvent, PeerInfo, PendingTransfer, TransferApprovalStatus, TransferDecision,
65 TransferDirection, TransferFile, TransferProgress, TransferRequest, TransferResponse,
66 TransferStatus,
67};
68
69pub use events::{
71 callback_handler, channel_handler, noop_handler, CallbackEventHandler, ChannelEventHandler,
72 EventHandler, NoopEventHandler,
73};
74
75pub use client::{get_network_interfaces, TransferClient};
77pub use config::{EngineConfig, EngineConfigBuilder};
78pub use error::{EngineError, EngineResult};
79pub use favorites::{FavoritesPersistence, InMemoryFavorites};
80pub use history::{HistoryPersistence, InMemoryHistory};
81pub use server::{ServerHandle, ServerState};
82
83pub use types::{Favorite, NetworkInterface, ResolveResult, TransferRecord};
85
86use std::path::PathBuf;
87use std::sync::Arc;
88
89pub struct GoshTransferEngine {
96 config: EngineConfig,
97 client: TransferClient,
98 server_state: Arc<ServerState>,
99 server_handle: Option<ServerHandle>,
100 event_handler: Arc<dyn EventHandler>,
101 history: Option<Arc<dyn HistoryPersistence>>,
102}
103
104impl GoshTransferEngine {
105 pub fn new(config: EngineConfig, event_handler: Arc<dyn EventHandler>) -> Self {
107 let server_state = Arc::new(ServerState::new(config.clone(), event_handler.clone()));
108 let client = TransferClient::new_with_config(event_handler.clone(), &config);
109
110 Self {
111 config,
112 client,
113 server_state,
114 server_handle: None,
115 event_handler,
116 history: None,
117 }
118 }
119
120 pub fn with_history(
124 config: EngineConfig,
125 event_handler: Arc<dyn EventHandler>,
126 history: Arc<dyn HistoryPersistence>,
127 ) -> Self {
128 let server_state = Arc::new(ServerState::new_with_history(
129 config.clone(),
130 event_handler.clone(),
131 history.clone(),
132 ));
133 let client = TransferClient::new_with_history_and_config(
134 event_handler.clone(),
135 history.clone(),
136 &config,
137 );
138
139 Self {
140 config,
141 client,
142 server_state,
143 server_handle: None,
144 event_handler,
145 history: Some(history),
146 }
147 }
148
149 pub fn with_channel_events(
154 config: EngineConfig,
155 ) -> (Self, tokio::sync::broadcast::Receiver<EngineEvent>) {
156 let (handler, receiver) = channel_handler(100);
157 (Self::new(config, handler), receiver)
158 }
159
160 pub fn with_channel_events_and_history(
165 config: EngineConfig,
166 history: Arc<dyn HistoryPersistence>,
167 ) -> (Self, tokio::sync::broadcast::Receiver<EngineEvent>) {
168 let (handler, receiver) = channel_handler(100);
169 (Self::with_history(config, handler, history), receiver)
170 }
171
172 pub fn history(&self) -> Option<&Arc<dyn HistoryPersistence>> {
174 self.history.as_ref()
175 }
176
177 pub async fn start_server(&mut self) -> EngineResult<()> {
184 if self.server_handle.is_some() {
185 return Err(EngineError::ServerAlreadyRunning);
186 }
187
188 let handle = server::start_server(self.server_state.clone(), self.config.port).await?;
189 self.server_handle = Some(handle);
190
191 Ok(())
192 }
193
194 pub async fn stop_server(&mut self) -> EngineResult<()> {
196 if let Some(handle) = self.server_handle.take() {
197 handle.shutdown();
198 self.event_handler.on_event(EngineEvent::ServerStopped);
199 }
200 Ok(())
201 }
202
203 pub fn is_server_running(&self) -> bool {
205 self.server_handle.is_some()
206 }
207
208 pub fn server_state(&self) -> &Arc<ServerState> {
210 &self.server_state
211 }
212
213 pub fn port(&self) -> u16 {
215 self.config.port
216 }
217
218 pub async fn change_port(&mut self, new_port: u16) -> EngineResult<()> {
231 self.change_port_with_options(new_port, true).await
232 }
233
234 pub async fn change_port_with_options(
248 &mut self,
249 new_port: u16,
250 rollback_on_failure: bool,
251 ) -> EngineResult<()> {
252 Self::validate_port(new_port)?;
254
255 let old_port = self.config.port;
256
257 if old_port == new_port {
259 tracing::debug!("Port unchanged ({}), skipping restart", new_port);
260 return Ok(());
261 }
262
263 let was_running = self.is_server_running();
264
265 if was_running {
267 self.stop_server().await?;
268 }
269
270 self.config.port = new_port;
272 self.server_state.update_config(self.config.clone()).await;
273
274 if was_running {
276 match self.start_server().await {
277 Ok(()) => {
278 self.event_handler
280 .on_event(EngineEvent::PortChanged { old_port, new_port });
281 tracing::info!("Server port changed from {} to {}", old_port, new_port);
282 Ok(())
283 }
284 Err(e) => {
285 tracing::error!("Failed to bind to new port {}: {}", new_port, e);
286
287 if rollback_on_failure {
288 tracing::info!("Attempting to restore previous port {}", old_port);
289
290 self.config.port = old_port;
292 self.server_state.update_config(self.config.clone()).await;
293
294 if let Err(restore_err) = self.start_server().await {
296 tracing::error!(
297 "Failed to restore old port {}: {}",
298 old_port,
299 restore_err
300 );
301 return Err(EngineError::Network(format!(
303 "Port change failed and rollback failed: original error: {}, rollback error: {}",
304 e, restore_err
305 )));
306 }
307
308 Err(EngineError::Network(format!(
310 "Failed to bind to port {}: {} (restored to port {})",
311 new_port, e, old_port
312 )))
313 } else {
314 Err(e)
316 }
317 }
318 }
319 } else {
320 self.event_handler
322 .on_event(EngineEvent::PortChanged { old_port, new_port });
323 Ok(())
324 }
325 }
326
327 fn validate_port(port: u16) -> EngineResult<()> {
329 if port == 0 {
330 return Err(EngineError::InvalidConfig(
331 "Port 0 (auto-assign) is not supported; specify an explicit port".to_string(),
332 ));
333 }
334
335 if port < 1024 {
336 tracing::warn!(
337 "Port {} is privileged and may require elevated permissions",
338 port
339 );
340 }
341
342 Ok(())
343 }
344
345 pub async fn send_files(
355 &self,
356 address: &str,
357 port: u16,
358 file_paths: Vec<PathBuf>,
359 ) -> EngineResult<()> {
360 if self.config.receive_only {
361 return Err(EngineError::InvalidConfig(
362 "Sending is disabled in receive-only mode".to_string(),
363 ));
364 }
365
366 self.client
367 .send_files(
368 address,
369 port,
370 file_paths,
371 Some(self.config.device_name.clone()),
372 )
373 .await
374 }
375
376 pub async fn send_directory(
381 &self,
382 address: &str,
383 port: u16,
384 dir_path: impl AsRef<std::path::Path>,
385 ) -> EngineResult<()> {
386 if self.config.receive_only {
387 return Err(EngineError::InvalidConfig(
388 "Sending is disabled in receive-only mode".to_string(),
389 ));
390 }
391
392 self.client
393 .send_directory(
394 address,
395 port,
396 dir_path,
397 Some(self.config.device_name.clone()),
398 )
399 .await
400 }
401
402 pub async fn accept_transfer(&self, transfer_id: &str) -> EngineResult<String> {
407 if !self.is_server_running() {
408 return Err(EngineError::ServerNotRunning);
409 }
410 self.server_state.accept_transfer(transfer_id).await
411 }
412
413 pub async fn reject_transfer(&self, transfer_id: &str) -> EngineResult<()> {
417 if !self.is_server_running() {
418 return Err(EngineError::ServerNotRunning);
419 }
420 self.server_state.reject_transfer(transfer_id).await
421 }
422
423 pub async fn get_pending_transfers(&self) -> Vec<PendingTransfer> {
425 self.server_state.get_pending_transfers().await
426 }
427
428 pub async fn cancel_transfer(&self, transfer_id: &str) -> EngineResult<()> {
433 self.server_state.cancel_transfer(transfer_id).await
434 }
435
436 pub async fn accept_all_transfers(&self) -> Vec<(String, EngineResult<String>)> {
441 if !self.is_server_running() {
442 let pending = self.get_pending_transfers().await;
443 return pending
444 .into_iter()
445 .map(|t| (t.id, Err(EngineError::ServerNotRunning)))
446 .collect();
447 }
448
449 let pending = self.get_pending_transfers().await;
450 let mut results = Vec::with_capacity(pending.len());
451
452 for transfer in pending {
453 let result = self.server_state.accept_transfer(&transfer.id).await;
454 results.push((transfer.id, result));
455 }
456
457 results
458 }
459
460 pub async fn reject_all_transfers(&self) -> Vec<(String, EngineResult<()>)> {
464 if !self.is_server_running() {
465 let pending = self.get_pending_transfers().await;
466 return pending
467 .into_iter()
468 .map(|t| (t.id, Err(EngineError::ServerNotRunning)))
469 .collect();
470 }
471
472 let pending = self.get_pending_transfers().await;
473 let mut results = Vec::with_capacity(pending.len());
474
475 for transfer in pending {
476 let result = self.server_state.reject_transfer(&transfer.id).await;
477 results.push((transfer.id, result));
478 }
479
480 results
481 }
482
483 pub fn resolve_address(address: &str) -> ResolveResult {
487 TransferClient::resolve_address(address)
488 }
489
490 pub fn resolve_address_or_err(address: &str) -> EngineResult<Vec<String>> {
492 TransferClient::resolve_address_or_err(address)
493 }
494
495 pub fn get_network_interfaces() -> Vec<NetworkInterface> {
497 get_network_interfaces()
498 }
499
500 pub async fn check_peer(&self, address: &str, port: u16) -> EngineResult<bool> {
502 self.client.check_peer(address, port).await
503 }
504
505 pub async fn get_peer_info(&self, address: &str, port: u16) -> EngineResult<serde_json::Value> {
507 self.client.get_peer_info(address, port).await
508 }
509
510 pub async fn update_config(&mut self, config: EngineConfig) {
516 self.client.update_config(&config);
517 self.config = config.clone();
518 self.server_state.update_config(config).await;
519 }
520
521 pub fn config(&self) -> &EngineConfig {
523 &self.config
524 }
525
526 pub async fn add_trusted_host(&mut self, host: String) {
528 if !self.config.trusted_hosts.contains(&host) {
529 self.config.trusted_hosts.push(host);
530 self.server_state.update_config(self.config.clone()).await;
531 }
532 }
533
534 pub async fn remove_trusted_host(&mut self, host: &str) {
536 self.config.trusted_hosts.retain(|h| h != host);
537 self.server_state.update_config(self.config.clone()).await;
538 }
539
540 pub fn trusted_hosts(&self) -> &[String] {
542 &self.config.trusted_hosts
543 }
544}
545
546impl Drop for GoshTransferEngine {
547 fn drop(&mut self) {
548 if let Some(handle) = self.server_handle.take() {
550 handle.shutdown();
551 }
552 }
553}
554
555#[cfg(test)]
556mod tests {
557 use super::*;
558
559 async fn find_available_port() -> u16 {
561 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
562 listener.local_addr().unwrap().port()
563 }
564
565 #[tokio::test]
566 async fn test_change_port_not_running() {
567 let config = EngineConfig::builder()
568 .port(53317)
569 .device_name("Test")
570 .download_dir("/tmp")
571 .build();
572
573 let (mut engine, mut rx) = GoshTransferEngine::with_channel_events(config);
574
575 engine.change_port(53318).await.unwrap();
577
578 assert_eq!(engine.port(), 53318);
579
580 let event = rx.try_recv().unwrap();
582 match event {
583 EngineEvent::PortChanged { old_port, new_port } => {
584 assert_eq!(old_port, 53317);
585 assert_eq!(new_port, 53318);
586 }
587 _ => panic!("Expected PortChanged event, got {:?}", event),
588 }
589 }
590
591 #[tokio::test]
592 async fn test_change_port_while_running() {
593 let port1 = find_available_port().await;
594 let port2 = find_available_port().await;
595
596 let config = EngineConfig::builder()
597 .port(port1)
598 .device_name("Test")
599 .download_dir("/tmp")
600 .build();
601
602 let (mut engine, _rx) = GoshTransferEngine::with_channel_events(config);
603
604 engine.start_server().await.unwrap();
605 assert!(engine.is_server_running());
606
607 engine.change_port(port2).await.unwrap();
608
609 assert_eq!(engine.port(), port2);
610 assert!(engine.is_server_running());
611
612 engine.stop_server().await.unwrap();
613 }
614
615 #[tokio::test]
616 async fn test_change_port_same_port_noop() {
617 let config = EngineConfig::builder()
618 .port(53317)
619 .device_name("Test")
620 .download_dir("/tmp")
621 .build();
622
623 let (mut engine, mut rx) = GoshTransferEngine::with_channel_events(config);
624
625 engine.change_port(53317).await.unwrap();
627 assert_eq!(engine.port(), 53317);
628
629 assert!(rx.try_recv().is_err());
631 }
632
633 #[tokio::test]
634 async fn test_change_port_invalid_zero() {
635 let config = EngineConfig::builder()
636 .port(53317)
637 .device_name("Test")
638 .download_dir("/tmp")
639 .build();
640
641 let (mut engine, _rx) = GoshTransferEngine::with_channel_events(config);
642
643 let result = engine.change_port(0).await;
644 assert!(matches!(result, Err(EngineError::InvalidConfig(_))));
645 }
646
647 #[tokio::test]
648 async fn test_change_port_rollback_on_failure() {
649 let port1 = find_available_port().await;
650
651 let config = EngineConfig::builder()
652 .port(port1)
653 .device_name("Test")
654 .download_dir("/tmp")
655 .build();
656
657 let (mut engine, _rx) = GoshTransferEngine::with_channel_events(config);
658
659 engine.start_server().await.unwrap();
660
661 let blocked_port = find_available_port().await;
663 let _blocker_v4 = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", blocked_port))
664 .await
665 .unwrap();
666 let _blocker_v6 = tokio::net::TcpListener::bind(format!("[::]:{}", blocked_port)).await;
667
668 let result = engine.change_port(blocked_port).await;
670 assert!(result.is_err());
671
672 assert_eq!(engine.port(), port1);
674
675 engine.stop_server().await.ok();
676 }
677
678 #[tokio::test]
679 async fn test_port_getter() {
680 let config = EngineConfig::builder()
681 .port(12345)
682 .device_name("Test")
683 .download_dir("/tmp")
684 .build();
685
686 let (engine, _rx) = GoshTransferEngine::with_channel_events(config);
687 assert_eq!(engine.port(), 12345);
688 }
689}