use crate::server::config::ServerConfig;
use crate::server::connection::ConnectionManager;
use crate::common::error::Result;
use crate::server::transports::{Server, ConnectionHandler};
use crate::server::transports::server_core::ServerCore;
use crate::server::handle::ServerHandle;
use crate::common::{generate_id};
use crate::transport::connection::Connection;
use crate::transport::quic::QUICTransport;
use async_trait::async_trait;
use quinn::{Endpoint, ServerConfig as QuinnServerConfig};
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::debug;
pub struct QUICServer {
config: ServerConfig,
core: Arc<ServerCore>, handler: Arc<dyn ConnectionHandler>,
endpoint: Option<Endpoint>,
is_running: Arc<Mutex<bool>>,
}
impl QUICServer {
pub fn new(config: ServerConfig, handler: Arc<dyn ConnectionHandler>) -> Result<Self> {
Self::with_connection_manager(config, handler, None)
}
pub fn with_connection_manager(
config: ServerConfig,
handler: Arc<dyn ConnectionHandler>,
connection_manager: Option<Arc<ConnectionManager>>,
) -> Result<Self> {
use std::sync::Once;
static INIT: Once = Once::new();
INIT.call_once(|| {
let _ = rustls::crypto::ring::default_provider().install_default();
});
let core = Arc::new(ServerCore::new(&config, connection_manager));
use crate::common::cert::{get_server_cert_der, get_server_key_der};
let cert_der = get_server_cert_der()
.map_err(|e| crate::common::error::FlareError::protocol_error(
format!("Failed to load server certificate: {}", e)
))?;
let key_der = get_server_key_der()
.map_err(|e| crate::common::error::FlareError::protocol_error(
format!("Failed to load server private key: {}", e)
))?;
debug!("[QUIC Server] Using certificate from certs/server.crt for localhost, 127.0.0.1, ::1");
let cert_der_bytes: quinn::rustls::pki_types::CertificateDer<'static> =
quinn::rustls::pki_types::CertificateDer::from(cert_der);
let certs = vec![cert_der_bytes];
if key_der.is_empty() {
return Err(crate::common::error::FlareError::protocol_error(
"Private key is empty".to_string()
));
}
let private_key = quinn::rustls::pki_types::PrivateKeyDer::Pkcs8(
quinn::rustls::pki_types::PrivatePkcs8KeyDer::from(key_der)
);
let server_config = QuinnServerConfig::with_single_cert(
certs,
private_key,
)
.map_err(|e| crate::common::error::FlareError::protocol_error(format!("Failed to create server config: {}", e)))?;
let addr = config.bind_address.parse::<SocketAddr>()
.map_err(|e| crate::common::error::FlareError::protocol_error(format!("Invalid address: {}", e)))?;
let endpoint = Endpoint::server(server_config, addr)
.map_err(|e| crate::common::error::FlareError::connection_failed(format!("Failed to create endpoint: {}", e)))?;
Ok(Self {
config,
core,
handler,
endpoint: Some(endpoint),
is_running: Arc::new(Mutex::new(false)),
})
}
pub fn with_shared_core(
config: ServerConfig,
handler: Arc<dyn ConnectionHandler>,
core: Arc<ServerCore>,
) -> Result<Self> {
use std::sync::Once;
static INIT: Once = Once::new();
INIT.call_once(|| {
let _ = rustls::crypto::ring::default_provider().install_default();
});
use crate::common::cert::{get_server_cert_der, get_server_key_der};
let cert_der = get_server_cert_der()
.map_err(|e| crate::common::error::FlareError::protocol_error(
format!("Failed to load server certificate: {}", e)
))?;
let key_der = get_server_key_der()
.map_err(|e| crate::common::error::FlareError::protocol_error(
format!("Failed to load server key: {}", e)
))?;
let cert = rustls::pki_types::CertificateDer::from(cert_der);
let key = rustls::pki_types::PrivateKeyDer::Pkcs8(
rustls::pki_types::PrivatePkcs8KeyDer::from(key_der)
);
let server_config = QuinnServerConfig::with_single_cert(
vec![cert],
key,
).map_err(|e| crate::common::error::FlareError::protocol_error(
format!("Failed to create QUIC server config: {}", e)
))?;
let addr = config.bind_address.parse()
.map_err(|e| crate::common::error::FlareError::protocol_error(
format!("Invalid address: {}", e)
))?;
let endpoint = Endpoint::server(server_config, addr)
.map_err(|e| crate::common::error::FlareError::protocol_error(
format!("Failed to create QUIC endpoint: {}", e)
))?;
Ok(Self {
config,
core,
handler,
endpoint: Some(endpoint),
is_running: Arc::new(Mutex::new(false)),
})
}
}
#[async_trait]
impl Server for QUICServer {
async fn start(&mut self) -> Result<()> {
*self.is_running.lock().await = true;
self.core.start_heartbeat(&self.config);
let endpoint = self.endpoint.take().ok_or_else(|| {
crate::common::error::FlareError::connection_failed("Endpoint not initialized".to_string())
})?;
let handler = Arc::clone(&self.handler);
let manager = Arc::clone(&self.core.connection_manager);
let parser = self.core.parser.clone();
let config = self.config.clone();
let is_running = Arc::clone(&self.is_running);
let core = Arc::clone(&self.core);
let core_clone = Arc::clone(&core);
tokio::spawn(async move {
eprintln!("[QUIC Server] Started listening for connections...");
while *is_running.lock().await {
if let Some(conn) = endpoint.accept().await {
eprintln!("[QUIC Server] Incoming connection received, waiting for handshake...");
let handler_clone = Arc::clone(&handler);
let manager_clone = Arc::clone(&manager);
let parser_clone = parser.clone();
let config_clone = config.clone();
let core_clone = Arc::clone(&core_clone);
tokio::spawn(async move {
match conn.await {
Ok(connecting) => {
eprintln!("[QUIC Server] Handshake completed, handling connection...");
handle_quic_connection(
connecting,
handler_clone,
manager_clone,
parser_clone,
config_clone,
core_clone,
).await;
}
Err(e) => {
eprintln!("[QUIC Server] Failed to accept QUIC connection: {}", e);
}
}
});
} else {
eprintln!("[QUIC Server] No more connections, stopping...");
break;
}
}
});
Ok(())
}
async fn stop(&mut self) -> Result<()> {
*self.is_running.lock().await = false;
self.core.stop_heartbeat();
let connection_ids = self.core.list_connections().await;
for conn_id in connection_ids {
let manager_trait = self.core.connection_manager_trait();
if let Some((conn, _)) = manager_trait.get_connection(&conn_id).await {
let mut c = conn.lock().await;
let _ = c.close().await;
}
let _ = ServerHandle::disconnect(&*self.core, &conn_id).await;
}
Ok(())
}
fn is_running(&self) -> bool {
*self.is_running.blocking_lock()
}
}
async fn handle_quic_connection(
connection: quinn::Connection,
handler: Arc<dyn ConnectionHandler>,
manager: Arc<ConnectionManager>,
parser: crate::common::MessageParser,
config: ServerConfig,
core: Arc<ServerCore>,
) {
let quinn_connection = connection;
if manager.connection_count() >= config.max_connections {
eprintln!("Connection limit exceeded: {}", config.max_connections);
quinn_connection.close(0u32.into(), b"limit exceeded");
return;
}
eprintln!("[QUIC Server] Waiting for client to open bidirectional stream...");
let (send, recv) = match quinn_connection.accept_bi().await {
Ok(streams) => {
eprintln!("[QUIC Server] Bidirectional stream accepted");
streams
},
Err(e) => {
eprintln!("[QUIC Server] Failed to accept bi stream: {}", e);
return;
}
};
let connection_id = generate_id();
let transport = QUICTransport::new(send, recv);
let connection: Box<dyn Connection> = Box::new(transport);
let requires_auth = core.auth_enabled();
if let Err(e) = manager.add_connection(connection_id.clone(), connection, None, requires_auth) {
eprintln!("Failed to add connection: {}", e);
return;
}
let handler_clone = Arc::clone(&handler);
let manager_clone = Arc::clone(&manager);
let parser_clone = parser.clone();
let conn_id_clone = connection_id.clone();
let core_clone = Arc::clone(&core);
let device_manager = core.device_manager();
let event_handler = core.event_handler();
let observer = Arc::new(crate::server::events::DefaultServerMessageObserver::new(
handler_clone,
manager_clone,
parser_clone,
conn_id_clone.clone(),
core_clone,
device_manager,
event_handler, ));
if let Some((conn, _)) = manager.get_connection(&connection_id) {
{
let mut c = conn.lock().await;
c.add_observer(observer);
}
}
}