use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime};
use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::{broadcast, Mutex, RwLock};
use aerox_core::Result;
use aerox_ecs::{EcsWorld, PlayerConnection, Position, PlayerName};
use aerox_network::ConnectionId;
use prost::Message;
#[derive(Clone, prost::Message)]
pub struct LoginRequest {
#[prost(string, tag = "1")]
pub username: String,
}
#[derive(Clone, prost::Message)]
pub struct LoginResponse {
#[prost(uint64, tag = "1")]
pub player_id: u64,
#[prost(string, tag = "2")]
pub message: String,
}
#[derive(Clone, prost::Message)]
pub struct MoveRequest {
#[prost(float, tag = "1")]
pub x: f32,
#[prost(float, tag = "2")]
pub y: f32,
#[prost(float, tag = "3")]
pub z: f32,
}
#[derive(Clone, prost::Message)]
pub struct PlayerMoveBroadcast {
#[prost(uint64, tag = "1")]
pub player_id: u64,
#[prost(string, tag = "2")]
pub username: String,
#[prost(float, tag = "3")]
pub x: f32,
#[prost(float, tag = "4")]
pub y: f32,
#[prost(float, tag = "5")]
pub z: f32,
}
#[derive(Clone, prost::Message)]
pub struct ChatMessage {
#[prost(string, tag = "1")]
pub content: String,
}
#[derive(Clone, prost::Message)]
pub struct ChatBroadcast {
#[prost(uint64, tag = "1")]
pub player_id: u64,
#[prost(string, tag = "2")]
pub username: String,
#[prost(string, tag = "3")]
pub content: String,
#[prost(uint64, tag = "4")]
pub timestamp: u64,
}
#[derive(Clone, prost::Message)]
pub struct PlayerJoinBroadcast {
#[prost(uint64, tag = "1")]
pub player_id: u64,
#[prost(string, tag = "2")]
pub username: String,
}
#[derive(Clone, prost::Message)]
pub struct PlayerLeaveBroadcast {
#[prost(uint64, tag = "1")]
pub player_id: u64,
}
#[derive(Clone, prost::Message)]
pub struct Heartbeat {}
#[derive(Clone, prost::Message)]
pub struct HeartbeatAck {}
const MSG_ID_LOGIN: u16 = 1001;
const MSG_ID_LOGIN_RESP: u16 = 1002;
const MSG_ID_MOVE: u16 = 2001;
const MSG_ID_MOVE_BROADCAST: u16 = 2002;
const MSG_ID_CHAT: u16 = 3001;
const MSG_ID_CHAT_BROADCAST: u16 = 3002;
const MSG_ID_PLAYER_JOIN: u16 = 4001;
const MSG_ID_PLAYER_LEAVE: u16 = 4002;
const MSG_ID_HEARTBEAT: u16 = 5001;
const MSG_ID_HEARTBEAT_ACK: u16 = 5002;
#[derive(Clone)]
pub struct ServerState {
pub world: Arc<Mutex<EcsWorld>>,
pub connections: Arc<Mutex<HashMap<ConnectionId, ClientInfo>>>,
pub broadcast_tx: broadcast::Sender<BroadcastMessage>,
pub next_player_id: Arc<Mutex<u64>>,
}
#[derive(Clone, Debug)]
pub struct ClientInfo {
pub connection_id: ConnectionId,
pub player_id: u64,
pub addr: SocketAddr,
pub socket: Arc<Mutex<TcpStream>>,
pub last_heartbeat: Arc<Mutex<Instant>>,
}
#[derive(Clone, Debug)]
pub enum BroadcastMessage {
PlayerJoin { player_id: u64, username: String },
PlayerLeave { player_id: u64 },
PlayerMove {
player_id: u64,
username: String,
x: f32,
y: f32,
z: f32,
},
Chat {
player_id: u64,
username: String,
content: String,
},
}
impl ServerState {
pub fn new() -> Self {
let (broadcast_tx, _) = broadcast::channel(1000);
Self {
world: Arc::new(Mutex::new(EcsWorld::new())),
connections: Arc::new(Mutex::new(HashMap::new())),
broadcast_tx,
next_player_id: Arc::new(Mutex::new(1)),
}
}
pub async fn allocate_player_id(&self) -> u64 {
let mut id = self.next_player_id.lock().await;
let player_id = *id;
*id += 1;
player_id
}
pub async fn broadcast(&self, msg: BroadcastMessage) {
let _ = self.broadcast_tx.send(msg);
}
pub async fn get_all_players(&self) -> Vec<(u64, String, (f32, f32, f32))> {
let mut world = self.world.lock().await;
let world_ref = world.world_mut();
let mut query = world_ref.query::<(&PlayerConnection, &Position, &PlayerName)>();
let mut players = Vec::new();
let conn_map = self.connections.lock().await;
for (conn, pos, name) in query.iter(world_ref) {
if let Some(info) = conn_map.get(&conn.connection_id) {
players.push((info.player_id, name.name.clone(), (pos.x, pos.y, pos.z)));
}
}
players
}
}
pub async fn run_server() -> Result<()> {
println!("╔════════════════════════════════════════╗");
println!("║ AeroX 完整游戏服务器 ║");
println!("╚════════════════════════════════════════╝\n");
let bind_addr: SocketAddr = "127.0.0.1:8082"
.parse()
.map_err(|e| aerox_core::AeroXError::validation(format!("Invalid address: {}", e)))?;
println!("🚀 启动服务器...");
println!(" 地址: {}\n", bind_addr);
let state = ServerState::new();
{
let mut world = state.world.lock().await;
world.initialize().map_err(|e| {
aerox_core::AeroXError::config(format!("Failed to initialize ECS world: {:?}", e))
})?;
}
println!("✓ ECS 世界已初始化");
let state_clone = state.clone();
tokio::spawn(async move {
broadcast_task(state_clone).await;
});
let listener = TcpListener::bind(bind_addr).await?;
println!("✓ 服务器启动成功,等待连接...\n");
println!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
println!("支持的消息:");
println!(" 登录、移动、聊天、心跳");
println!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");
let mut connection_count = 0;
loop {
match listener.accept().await {
Ok((socket, addr)) => {
connection_count += 1;
println!("📥 新连接 #{} 来自: {}", connection_count, addr);
let state_clone = state.clone();
tokio::spawn(async move {
if let Err(e) = handle_client(socket, addr, connection_count, state_clone).await {
eprintln!("❌ 连接 #{} 错误: {}", connection_count, e);
}
});
}
Err(e) => {
eprintln!("❌ 接受连接失败: {}", e);
}
}
}
}
async fn broadcast_task(state: ServerState) {
let mut rx = state.broadcast_tx.subscribe();
loop {
match rx.recv().await {
Ok(msg) => {
let connections = state.connections.lock().await;
for (conn_id, info) in connections.iter() {
if let Ok(mut socket) = info.socket.try_lock() {
let result = match &msg {
BroadcastMessage::PlayerJoin { player_id, username } => {
let broadcast = PlayerJoinBroadcast {
player_id: *player_id,
username: username.clone(),
};
send_message(&mut *socket, MSG_ID_PLAYER_JOIN, &broadcast).await
}
BroadcastMessage::PlayerLeave { player_id } => {
let broadcast = PlayerLeaveBroadcast {
player_id: *player_id,
};
send_message(&mut *socket, MSG_ID_PLAYER_LEAVE, &broadcast).await
}
BroadcastMessage::PlayerMove { player_id, username, x, y, z } => {
let broadcast = PlayerMoveBroadcast {
player_id: *player_id,
username: username.clone(),
x: *x,
y: *y,
z: *z,
};
send_message(&mut *socket, MSG_ID_MOVE_BROADCAST, &broadcast).await
}
BroadcastMessage::Chat { player_id, username, content } => {
let timestamp = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs();
let broadcast = ChatBroadcast {
player_id: *player_id,
username: username.clone(),
content: content.clone(),
timestamp,
};
send_message(&mut *socket, MSG_ID_CHAT_BROADCAST, &broadcast).await
}
};
if let Err(e) = result {
eprintln!("广播到 {:?} 失败: {}", conn_id, e);
}
}
}
}
Err(e) => {
eprintln!("广播通道错误: {:?}", e);
break;
}
}
}
}
async fn handle_client(
mut socket: TcpStream,
addr: SocketAddr,
conn_id: usize,
state: ServerState,
) -> Result<()> {
println!(" ↳ 连接 #{} 已建立", conn_id);
let connection_id = ConnectionId::new(conn_id as u64);
let mut buffer = [0u8; 8192];
let mut messages_received = 0u64;
loop {
match socket.read_exact(&mut buffer[..10]).await {
Ok(_) => {}
Err(e) => {
println!(" ↳ 连接 #{} 已关闭 (接收 {} 条消息)", conn_id, messages_received);
cleanup_connection(&state, connection_id).await;
break;
}
}
let frame_len = u32::from_le_bytes([buffer[0], buffer[1], buffer[2], buffer[3]]) as usize;
let msg_id = u16::from_le_bytes([buffer[4], buffer[5]]);
let _seq_id = u32::from_le_bytes([buffer[6], buffer[7], buffer[8], buffer[9]]);
let payload_len = frame_len.saturating_sub(6);
if payload_len > 0 {
if payload_len > buffer.len() {
eprintln!(" ↳ 连接 #{} 消息体过大: {}", conn_id, payload_len);
break;
}
socket.read_exact(&mut buffer[..payload_len]).await?;
let payload = &buffer[..payload_len];
messages_received += 1;
match msg_id {
MSG_ID_LOGIN => handle_login(&state, connection_id, addr, payload).await?,
MSG_ID_MOVE => handle_move(&state, connection_id, payload).await?,
MSG_ID_CHAT => handle_chat(&state, connection_id, payload).await?,
MSG_ID_HEARTBEAT => {
if let Some(conn_info) = state.connections.lock().await.get(&connection_id) {
*conn_info.last_heartbeat.lock().await = Instant::now();
}
}
_ => {
println!(" ↳ 连接 #{} 未知消息类型: {}", conn_id, msg_id);
}
}
}
}
Ok(())
}
async fn handle_login(
state: &ServerState,
connection_id: ConnectionId,
addr: SocketAddr,
payload: &[u8],
) -> Result<()> {
if let Ok(req) = LoginRequest::decode(payload) {
println!(" ↳ [LOGIN] 用户登录: {}", req.username);
let player_id = state.allocate_player_id().await;
{
let mut world = state.world.lock().await;
let _entity = world.spawn_bundle((
PlayerConnection::new(connection_id, addr),
Position::origin(),
PlayerName::new(req.username.clone()),
));
}
println!(" ↳ [LOGIN] 玩家 {} (ID: {}) 登录成功", req.username, player_id);
state
.broadcast(BroadcastMessage::PlayerJoin {
player_id,
username: req.username,
})
.await;
let players = state.get_all_players().await;
println!(" ↳ 当前在线玩家: {} 人", players.len());
}
Ok(())
}
async fn handle_move(state: &ServerState, connection_id: ConnectionId, payload: &[u8]) -> Result<()> {
if let Ok(req) = MoveRequest::decode(payload) {
let player_id = {
let connections = state.connections.lock().await;
connections.get(&connection_id).map(|info| info.player_id)
};
if let Some(pid) = player_id {
{
let mut world = state.world.lock().await;
let world_mut = world.world_mut();
}
println!(" ↳ [MOVE] 玩家 {} 移动到 ({}, {}, {})", pid, req.x, req.y, req.z);
}
}
Ok(())
}
async fn handle_chat(state: &ServerState, connection_id: ConnectionId, payload: &[u8]) -> Result<()> {
if let Ok(req) = ChatMessage::decode(payload) {
let (player_id, username) = {
let connections = state.connections.lock().await;
if let Some(info) = connections.get(&connection_id) {
let mut world = state.world.lock().await;
let world_ref = world.world_mut();
let mut query = world_ref.query::<(&PlayerConnection, &PlayerName)>();
let mut found_username = None;
for (conn, name) in query.iter(world_ref) {
if conn.connection_id == connection_id {
found_username = Some(name.name.clone());
break;
}
}
(info.player_id, found_username.unwrap_or_else(|| "".to_string()))
} else {
return Ok(());
}
};
if !username.is_empty() {
println!(" ↳ [CHAT] {}: {}", username, req.content);
state
.broadcast(BroadcastMessage::Chat {
player_id,
username,
content: req.content,
})
.await;
}
}
Ok(())
}
async fn cleanup_connection(state: &ServerState, connection_id: ConnectionId) {
let player_id = {
let connections = state.connections.lock().await;
connections
.get(&connection_id)
.map(|info| info.player_id)
};
if let Some(pid) = player_id {
println!(" ↳ [CLEANUP] 清理玩家 ID: {}", pid);
let mut connections = state.connections.lock().await;
connections.remove(&connection_id);
state
.broadcast(BroadcastMessage::PlayerLeave { player_id: pid })
.await;
}
}
async fn send_message<M: prost::Message>(
socket: &mut TcpStream,
msg_id: u16,
message: &M,
) -> Result<()> {
let mut buf = Vec::new();
message
.encode(&mut buf)
.map_err(|e| aerox_core::AeroXError::protocol(format!("Encode error: {:?}", e)))?;
let payload_len = buf.len();
let frame_len = 6 + payload_len;
socket.write_all(&(frame_len as u32).to_le_bytes()).await?;
socket.write_all(&msg_id.to_le_bytes()).await?;
socket.write_all(&0u32.to_le_bytes()).await?;
socket.write_all(&buf).await?;
Ok(())
}
pub async fn run_client() -> aerox_client::Result<()> {
println!("╔════════════════════════════════════════╗");
println!("║ AeroX 游戏客户端 ║");
println!("╚════════════════════════════════════════╝\n");
use aerox_client::StreamClient;
let addr: SocketAddr = "127.0.0.1:8082".parse().unwrap();
println!("🔗 连接到服务器: {}...\n", addr);
let mut client = StreamClient::connect(addr).await?;
println!("✓ 连接成功!\n");
println!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
println!("游戏操作:\n");
let username = format!("Player{}", std::process::id() % 1000);
println!("1️⃣ 登录为: {}", username);
let login_req = LoginRequest { username: username.clone() };
client.send_message(MSG_ID_LOGIN, &login_req).await?;
tokio::time::sleep(Duration::from_secs(1)).await;
println!("2️⃣ 移动到随机位置");
let x = (std::process::id() as f32 * 17.0) % 100.0;
let y = (std::process::id() as f32 * 23.0) % 100.0;
let z = (std::process::id() as f32 * 29.0) % 100.0;
let move_req = MoveRequest { x, y, z };
client.send_message(MSG_ID_MOVE, &move_req).await?;
tokio::time::sleep(Duration::from_secs(1)).await;
println!("3️⃣ 发送聊天消息");
let chat_msg = ChatMessage {
content: format!("Hello from {}!", username),
};
client.send_message(MSG_ID_CHAT, &chat_msg).await?;
tokio::time::sleep(Duration::from_secs(1)).await;
println!("4️⃣ 发送心跳");
client.send_message(MSG_ID_HEARTBEAT, &Heartbeat {}).await?;
tokio::time::sleep(Duration::from_secs(1)).await;
println!("\n5️⃣ 持续游戏循环(每 2 秒移动一次)");
for i in 1..=5 {
let x = (i as f32 * 10.0) % 100.0;
let y = (i as f32 * 15.0) % 100.0;
let z = (i as f32 * 20.0) % 100.0;
client.send_message(MSG_ID_MOVE, &MoveRequest { x, y, z }).await?;
client.send_message(MSG_ID_HEARTBEAT, &Heartbeat {}).await?;
println!(" 循环 {}/5 - 移动到 ({}, {}, {})", i, x, y, z);
tokio::time::sleep(Duration::from_secs(2)).await;
}
println!("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
println!("✓ 游戏会话结束");
Ok(())
}
#[tokio::main]
async fn main() -> aerox_core::Result<()> {
let args: Vec<String> = std::env::args().collect();
if args.len() < 2 {
println!("用法:");
println!(" server - 启动服务器");
println!(" client - 启动客户端");
return Ok(());
}
match args[1].as_str() {
"server" => run_server().await,
"client" => {
run_client()
.await
.map_err(|e| aerox_core::AeroXError::network(format!("Client error: {:?}", e)))
}
_ => {
eprintln!("未知参数: {}", args[1]);
eprintln!("使用 'server' 或 'client'");
Ok(())
}
}
}