use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::{broadcast, Mutex};
use aerox_core::Result;
use aerox_ecs::{EcsWorld, PlayerConnection, Position, PlayerName};
use aerox_network::ConnectionId;
use prost::Message;
#[derive(Clone, prost::Message)]
pub struct LoginRequest {
#[prost(string, tag = "1")]
pub username: String,
}
#[derive(Clone, prost::Message)]
pub struct LoginResponse {
#[prost(uint64, tag = "1")]
pub player_id: u64,
#[prost(string, tag = "2")]
pub message: String,
}
#[derive(Clone, prost::Message)]
pub struct MoveRequest {
#[prost(float, tag = "1")]
pub x: f32,
#[prost(float, tag = "2")]
pub y: f32,
#[prost(float, tag = "3")]
pub z: f32,
}
#[derive(Clone, prost::Message)]
pub struct PlayerMoveBroadcast {
#[prost(uint64, tag = "1")]
pub player_id: u64,
#[prost(string, tag = "2")]
pub username: String,
#[prost(float, tag = "3")]
pub x: f32,
#[prost(float, tag = "4")]
pub y: f32,
#[prost(float, tag = "5")]
pub z: f32,
}
#[derive(Clone, prost::Message)]
pub struct ChatMessage {
#[prost(string, tag = "1")]
pub content: String,
}
#[derive(Clone, prost::Message)]
pub struct ChatBroadcast {
#[prost(uint64, tag = "1")]
pub player_id: u64,
#[prost(string, tag = "2")]
pub username: String,
#[prost(string, tag = "3")]
pub content: String,
}
#[derive(Clone, prost::Message)]
pub struct GetPlayersRequest {}
#[derive(Clone, prost::Message)]
pub struct PlayerInfo {
#[prost(uint64, tag = "1")]
pub player_id: u64,
#[prost(string, tag = "2")]
pub username: String,
#[prost(float, tag = "3")]
pub x: f32,
#[prost(float, tag = "4")]
pub y: f32,
#[prost(float, tag = "5")]
pub z: f32,
}
#[derive(Clone, prost::Message)]
pub struct PlayerListResponse {
#[prost(message, repeated, tag = "1")]
pub players: Vec<PlayerInfo>,
}
#[derive(Clone, prost::Message)]
pub struct Heartbeat {}
#[derive(Clone, prost::Message)]
pub struct HeartbeatAck {}
const MSG_ID_LOGIN_REQUEST: u16 = 1001;
const MSG_ID_LOGIN_RESPONSE: u16 = 1002;
const MSG_ID_MOVE_REQUEST: u16 = 2001;
const MSG_ID_PLAYER_MOVE: u16 = 2002;
const MSG_ID_CHAT_MESSAGE: u16 = 3001;
const MSG_ID_CHAT_BROADCAST: u16 = 3002;
const MSG_ID_GET_PLAYERS: u16 = 4001;
const MSG_ID_PLAYER_LIST: u16 = 4002;
const MSG_ID_HEARTBEAT: u16 = 5001;
const MSG_ID_HEARTBEAT_ACK: u16 = 5002;
#[derive(Clone)]
pub struct ServerState {
pub world: Arc<Mutex<EcsWorld>>,
pub connection_to_player: Arc<Mutex<HashMap<ConnectionId, u64>>>,
pub player_to_connection: Arc<Mutex<HashMap<u64, ConnectionId>>>,
pub broadcast_tx: tokio::sync::broadcast::Sender<String>,
pub next_player_id: Arc<Mutex<u64>>,
}
impl ServerState {
pub fn new() -> Self {
let (broadcast_tx, _) = broadcast::channel(1000);
Self {
world: Arc::new(Mutex::new(EcsWorld::new())),
connection_to_player: Arc::new(Mutex::new(HashMap::new())),
player_to_connection: Arc::new(Mutex::new(HashMap::new())),
broadcast_tx,
next_player_id: Arc::new(Mutex::new(1)),
}
}
pub async fn allocate_player_id(&self) -> u64 {
let mut id = self.next_player_id.lock().await;
let player_id = *id;
*id += 1;
player_id
}
}
pub async fn run_server() -> Result<()> {
println!("╔════════════════════════════════════════╗");
println!("║ AeroX ECS 游戏服务器示例 ║");
println!("╚════════════════════════════════════════╝\n");
let bind_addr: SocketAddr = "127.0.0.1:8080"
.parse()
.map_err(|e| aerox_core::AeroXError::validation(format!("Invalid address: {}", e)))?;
println!("🚀 启动服务器...");
println!(" 地址: {}\n", bind_addr);
let state = ServerState::new();
{
let mut world = state.world.lock().await;
world.initialize().map_err(|e| {
aerox_core::AeroXError::config(format!("Failed to initialize ECS world: {:?}", e))
})?;
}
println!("✓ ECS 世界已初始化");
let listener = TcpListener::bind(bind_addr).await?;
println!("✓ 服务器启动成功,等待连接...\n");
println!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
println!("支持的消息类型:");
println!(" [1001] LoginRequest -> LoginResponse");
println!(" [2001] MoveRequest -> PlayerMoveBroadcast");
println!(" [3001] ChatMessage -> ChatBroadcast");
println!(" [4001] GetPlayers -> PlayerList");
println!(" [5001] Heartbeat -> HeartbeatAck");
println!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");
let mut connection_count = 0;
loop {
match listener.accept().await {
Ok((socket, addr)) => {
connection_count += 1;
println!("📥 新连接 #{} 来自: {}", connection_count, addr);
let state_clone = state.clone();
tokio::spawn(async move {
if let Err(e) = handle_client(socket, addr, connection_count, state_clone).await {
eprintln!("❌ 连接 #{} 错误: {}", connection_count, e);
}
});
}
Err(e) => {
eprintln!("❌ 接受连接失败: {}", e);
}
}
}
}
async fn handle_client(
mut socket: TcpStream,
addr: SocketAddr,
conn_id: usize,
state: ServerState,
) -> Result<()> {
println!(" ↳ 连接 #{} 已建立", conn_id);
let connection_id = ConnectionId::new(conn_id as u64);
let mut buffer = [0u8; 8192];
let mut messages_received = 0u64;
loop {
match socket.read_exact(&mut buffer[..10]).await {
Ok(_) => {}
Err(e) => {
println!(" ↳ 连接 #{} 已关闭 (接收 {} 条消息)", conn_id, messages_received);
cleanup_player(&state, connection_id).await;
break;
}
}
let frame_len = u32::from_le_bytes([buffer[0], buffer[1], buffer[2], buffer[3]]) as usize;
let msg_id = u16::from_le_bytes([buffer[4], buffer[5]]);
let _seq_id = u32::from_le_bytes([buffer[6], buffer[7], buffer[8], buffer[9]]);
let payload_len = frame_len.saturating_sub(6);
if payload_len > 0 {
if payload_len > buffer.len() {
eprintln!(" ↳ 连接 #{} 消息体过大: {}", conn_id, payload_len);
break;
}
socket.read_exact(&mut buffer[..payload_len]).await?;
let payload = &buffer[..payload_len];
messages_received += 1;
match msg_id {
MSG_ID_LOGIN_REQUEST => {
if let Ok(req) = LoginRequest::decode(payload) {
handle_login(&mut socket, &state, connection_id, addr, req).await?;
}
}
MSG_ID_MOVE_REQUEST => {
if let Ok(req) = MoveRequest::decode(payload) {
handle_move(&mut socket, &state, connection_id, req).await?;
}
}
MSG_ID_CHAT_MESSAGE => {
if let Ok(req) = ChatMessage::decode(payload) {
handle_chat(&mut socket, &state, connection_id, req).await?;
}
}
MSG_ID_GET_PLAYERS => {
handle_get_players(&mut socket, &state).await?;
}
MSG_ID_HEARTBEAT => {
handle_heartbeat(&mut socket).await?;
}
_ => {
println!(" ↳ 连接 #{} 收到未知消息类型: {}", conn_id, msg_id);
}
}
}
}
Ok(())
}
async fn handle_login(
socket: &mut TcpStream,
state: &ServerState,
connection_id: ConnectionId,
addr: SocketAddr,
req: LoginRequest,
) -> Result<()> {
println!(" ↳ [LOGIN] 用户登录: {}", req.username);
let player_id = state.allocate_player_id().await;
{
let mut world = state.world.lock().await;
let _entity = world.spawn_bundle((
PlayerConnection::new(connection_id, addr),
Position::origin(),
PlayerName::new(req.username.clone()),
));
println!(" ↳ [ECS] 创建实体 (player_id={})", player_id);
}
{
let mut conn_to_player = state.connection_to_player.lock().await;
let mut player_to_conn = state.player_to_connection.lock().await;
conn_to_player.insert(connection_id, player_id);
player_to_conn.insert(player_id, connection_id);
}
let response = LoginResponse {
player_id,
message: format!("欢迎, {}!", req.username),
};
send_message(socket, MSG_ID_LOGIN_RESPONSE, &response).await?;
println!(" ↳ [LOGIN] 玩家 {} (ID: {}) 登录成功", req.username, player_id);
Ok(())
}
async fn handle_move(
socket: &mut TcpStream,
state: &ServerState,
connection_id: ConnectionId,
req: MoveRequest,
) -> Result<()> {
let player_id = {
let conn_to_player = state.connection_to_player.lock().await;
conn_to_player.get(&connection_id).copied()
};
if let Some(pid) = player_id {
println!(" ↳ [MOVE] 玩家 ID {} 移动到 ({}, {}, {})", pid, req.x, req.y, req.z);
} else {
println!(" ↳ [MOVE] 未知的连接 ID: {:?}", connection_id);
}
Ok(())
}
async fn handle_chat(
socket: &mut TcpStream,
state: &ServerState,
connection_id: ConnectionId,
req: ChatMessage,
) -> Result<()> {
let player_id = {
let conn_to_player = state.connection_to_player.lock().await;
conn_to_player.get(&connection_id).copied()
};
if let Some(pid) = player_id {
println!(" ↳ [CHAT] 玩家 ID {}: {}", pid, req.content);
}
Ok(())
}
async fn handle_get_players(socket: &mut TcpStream, state: &ServerState) -> Result<()> {
let conn_to_player_map = {
let conn_to_player = state.connection_to_player.lock().await;
conn_to_player.clone()
};
let mut world = state.world.lock().await;
let world_mut = world.world_mut();
let mut query = world_mut.query::<(&PlayerConnection, &Position, &PlayerName)>();
let mut players = Vec::new();
for (conn, pos, name) in query.iter(world_mut) {
let player_id = conn_to_player_map.get(&conn.connection_id).copied().unwrap_or(0);
players.push(PlayerInfo {
player_id,
username: name.name.clone(),
x: pos.x,
y: pos.y,
z: pos.z,
});
}
let response = PlayerListResponse { players };
send_message(socket, MSG_ID_PLAYER_LIST, &response).await?;
Ok(())
}
async fn handle_heartbeat(socket: &mut TcpStream) -> Result<()> {
let ack = HeartbeatAck {};
send_message(socket, MSG_ID_HEARTBEAT_ACK, &ack).await?;
Ok(())
}
async fn cleanup_player(state: &ServerState, connection_id: ConnectionId) {
let player_id = {
let mut conn_to_player = state.connection_to_player.lock().await;
let mut player_to_conn = state.player_to_connection.lock().await;
let player_id = conn_to_player.remove(&connection_id);
if let Some(pid) = player_id {
player_to_conn.remove(&pid);
}
player_id
};
if let Some(pid) = player_id {
println!(" ↳ [CLEANUP] 清理玩家 ID: {}", pid);
}
}
async fn send_message<M: prost::Message>(
socket: &mut TcpStream,
msg_id: u16,
message: &M,
) -> Result<()> {
let mut buf = Vec::new();
message.encode(&mut buf).map_err(|e| {
aerox_core::AeroXError::protocol(format!("Failed to encode message: {:?}", e))
})?;
let payload_len = buf.len();
let frame_len = 6 + payload_len;
socket.write_all(&(frame_len as u32).to_le_bytes()).await?;
socket.write_all(&msg_id.to_le_bytes()).await?;
socket.write_all(&0u32.to_le_bytes()).await?;
socket.write_all(&buf).await?;
Ok(())
}
pub struct ClientState {
pub logged_in: bool,
pub player_id: Option<u64>,
pub username: Option<String>,
pub position: (f32, f32, f32),
}
impl Default for ClientState {
fn default() -> Self {
Self {
logged_in: false,
player_id: None,
username: None,
position: (0.0, 0.0, 0.0),
}
}
}
pub async fn run_client() -> aerox_client::Result<()> {
println!("╔════════════════════════════════════════╗");
println!("║ AeroX 游戏客户端 ║");
println!("╚════════════════════════════════════════╝\n");
use aerox_client::StreamClient;
let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
println!("🔗 连接到服务器: {}...\n", addr);
let mut client = StreamClient::connect(addr).await?;
println!("✓ 连接成功!\n");
println!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
println!("自动演示模式:\n");
let username = format!("Player{}", 1);
println!("1. 登录为: {}", username);
let login_req = LoginRequest { username: username.clone() };
client.send_message(MSG_ID_LOGIN_REQUEST, &login_req).await?;
tokio::time::sleep(Duration::from_secs(1)).await;
println!("2. 移动到 (10.0, 20.0, 30.0)");
let move_req = MoveRequest { x: 10.0, y: 20.0, z: 30.0 };
client.send_message(MSG_ID_MOVE_REQUEST, &move_req).await?;
tokio::time::sleep(Duration::from_secs(1)).await;
println!("3. 发送聊天消息");
let chat_msg = ChatMessage {
content: "Hello from AeroX client!".to_string(),
};
client.send_message(MSG_ID_CHAT_MESSAGE, &chat_msg).await?;
tokio::time::sleep(Duration::from_secs(1)).await;
println!("4. 获取玩家列表");
client.send_message(MSG_ID_GET_PLAYERS, &GetPlayersRequest {}).await?;
tokio::time::sleep(Duration::from_secs(1)).await;
println!("5. 发送心跳");
client.send_message(MSG_ID_HEARTBEAT, &Heartbeat {}).await?;
tokio::time::sleep(Duration::from_secs(1)).await;
println!("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
println!("✓ 演示完成");
Ok(())
}
#[tokio::main]
async fn main() -> aerox_core::Result<()> {
let args: Vec<String> = std::env::args().collect();
if args.len() < 2 {
println!("用法:");
println!(" server - 启动服务器");
println!(" client - 启动客户端");
return Ok(());
}
match args[1].as_str() {
"server" => run_server().await,
"client" => {
run_client()
.await
.map_err(|e| aerox_core::AeroXError::network(format!("Client error: {:?}", e)))
}
_ => {
eprintln!("未知参数: {}", args[1]);
eprintln!("使用 'server' 或 'client'");
Ok(())
}
}
}