evnet/
lib.rs

1// lib.rs or networking.rs
2use bevy::app::App;
3use bevy::prelude::*;
4use bevy_matchbox::matchbox_socket::{ChannelConfig, WebRtcSocket};
5use bevy_matchbox::prelude::PeerId;
6use bevy_matchbox::MatchboxSocket;
7use futures::channel::mpsc::SendError;
8use serde::{Deserialize, Serialize};
9// Core traits and types
10//----------------------------------------
11
12#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
13pub enum Reliability {
14    Reliable = 0,
15    Unreliable = 1,
16    UnreliableOrdered = 2,
17}
18impl Reliability {
19    pub const RELIABILITY: [Reliability; 3] = [
20        Reliability::Reliable,
21        Reliability::Unreliable,
22        Reliability::UnreliableOrdered,
23    ];
24}
25
26pub trait NetworkedEvent {
27    const RELIABILITY: Reliability;
28    fn id(&self) -> PeerId;
29}
30
31#[derive(Serialize, Deserialize)]
32pub struct Message {
33    pub type_name: String,
34    pub content: Vec<u8>,
35}
36impl Message {
37    pub fn new<T: Serialize + TypePath>(content: &T) -> Self {
38        Message {
39            type_name: T::type_path().to_string(),
40            content: bincode::serialize(content).unwrap(),
41        }
42    }
43}
44
45// Socket message handling trait and implementation
46//----------------------------------------
47
48pub trait SocketSendMessage {
49    fn receive_msg(&mut self, reliability: Reliability) -> impl Iterator<Item = (PeerId, Message)>;
50    fn send_msg_all<T: Serialize + TypePath + Event + NetworkedEvent>(
51        &mut self,
52        message: &T,
53        reliability: Reliability,
54    ) -> Result<(), SendError>;
55    fn send_msg<T: Serialize + TypePath + Event + NetworkedEvent>(
56        &mut self,
57        peer: PeerId,
58        message: &T,
59        reliability: Reliability,
60    ) -> Result<(), SendError>;
61}
62
63impl SocketSendMessage for WebRtcSocket {
64    fn receive_msg(&mut self, reliability: Reliability) -> impl Iterator<Item = (PeerId, Message)> {
65        self.channel_mut(reliability as usize)
66            .receive()
67            .into_iter()
68            .map(|(id, packet)| (id, bincode::deserialize(&packet).unwrap()))
69    }
70    fn send_msg_all<T: Serialize + TypePath + Event + NetworkedEvent>(
71        &mut self,
72        message: &T,
73        reliability: Reliability,
74    ) -> Result<(), SendError> {
75        let peers = self.connected_peers().collect::<Vec<_>>();
76        for peer in peers {
77            self.send_msg(peer, message, reliability)?;
78        }
79        Ok(())
80    }
81
82    fn send_msg<T: Serialize + TypePath + Event + NetworkedEvent>(
83        &mut self,
84        peer: PeerId,
85        message: &T,
86        reliability: Reliability,
87    ) -> Result<(), SendError> {
88        let msg = Message::new(message);
89        let msg = bincode::serialize(&msg).unwrap();
90        self.channel_mut(reliability as usize)
91            .try_send(msg.into(), peer)?;
92        Ok(())
93    }
94}
95
96// Message routing and handling
97//----------------------------------------
98
99#[derive(Default, Resource)]
100pub struct NetworkedMessages(
101    std::collections::HashMap<String, (fn(&mut World, &[u8]), fn(&mut World))>,
102);
103
104fn route_outgoing_messages<
105    T: NetworkedEvent + Event + Serialize + for<'a> Deserialize<'a> + TypePath,
106>(
107    world: &mut World,
108) {
109    world.resource_scope(|world, mut socket: Mut<MatchboxSocket>| {
110        let events = world.resource_mut::<Events<T>>();
111        let mut cursor = events.get_cursor();
112        for e in cursor.read(&events) {
113            if e.id() == socket.id().expect("Not connected") {
114                if let Err(err) = socket.send_msg_all(e, T::RELIABILITY) {
115                    error!("Failed to send message: {:?}", err);
116                }
117            }
118        }
119    });
120}
121
122fn route_incoming_messages<T: NetworkedEvent + Event + for<'a> Deserialize<'a>>(
123    world: &mut World,
124    message: &[u8],
125) {
126    let e: T = bincode::deserialize(message).unwrap();
127    world.send_event(e);
128}
129
130fn route_messages(world: &mut World) {
131    if !world.contains_resource::<MatchboxSocket>() {
132        return;
133    }
134
135    world.resource_scope(|world, networked_messages: Mut<NetworkedMessages>| {
136        world.resource_scope(|world, mut socket: Mut<MatchboxSocket>| {
137            for reliability in Reliability::RELIABILITY {
138                for (_peer_id, msg) in socket.receive_msg(reliability) {
139                    let func = networked_messages.0.get(&msg.type_name).unwrap();
140                    func.0(world, &msg.content);
141                }
142            }
143        });
144        for (_, route_outgoing_messages) in networked_messages.0.values() {
145            route_outgoing_messages(world);
146        }
147    });
148    let _peers = world
149        .get_resource_mut::<MatchboxSocket>()
150        .unwrap()
151        .update_peers();
152}
153
154#[derive(Clone, Copy, Resource, Default)]
155pub struct LocalId(Option<PeerId>);
156impl LocalId {
157    pub fn get(&self) -> Option<PeerId> {
158        self.0
159    }
160}
161impl std::ops::Deref for LocalId {
162    type Target = Option<PeerId>;
163
164    fn deref(&self) -> &Self::Target {
165        &self.0
166    }
167}
168
169fn local_id_set(mut local_id: ResMut<LocalId>, matchbox_socket: Option<ResMut<MatchboxSocket>>) {
170    if let Some(mut matchbox_socket) = matchbox_socket {
171        local_id.0 = matchbox_socket.id();
172    } else {
173        local_id.0.take();
174    }
175}
176
177// Plugin implementation and app extension
178//----------------------------------------
179
180pub struct EvnetPlugin;
181
182impl Plugin for EvnetPlugin {
183    fn build(&self, app: &mut App) {
184        app.add_systems(Update, (local_id_set, route_messages).chain());
185        app.init_resource::<NetworkedMessages>();
186        app.init_resource::<LocalId>();
187    }
188}
189
190pub trait NetworkedAppExt {
191    fn register_networked_event<
192        T: NetworkedEvent + Event + for<'a> Deserialize<'a> + Serialize + TypePath,
193    >(
194        &mut self,
195    ) -> &mut Self;
196}
197
198impl NetworkedAppExt for App {
199    fn register_networked_event<
200        T: NetworkedEvent + Event + for<'a> Deserialize<'a> + Serialize + TypePath,
201    >(
202        &mut self,
203    ) -> &mut Self {
204        self.init_resource::<NetworkedMessages>();
205        let mut networked_messages = self.world_mut().resource_mut::<NetworkedMessages>();
206        networked_messages.0.insert(
207            T::type_path().to_string(),
208            (route_incoming_messages::<T>, route_outgoing_messages::<T>),
209        );
210        self
211    }
212}
213
214pub trait NetworkedCommandExt {
215    fn connect(&mut self, room: &str);
216}
217
218impl NetworkedCommandExt for Commands<'_, '_> {
219    fn connect(&mut self, room_url: &str) {
220        let matchbox = MatchboxSocket::from(
221            //example: "wss://mb.v-sekai.cloud/my-room-1"
222            bevy_matchbox::matchbox_socket::WebRtcSocketBuilder::new(room_url)
223                .add_reliable_channel()
224                .add_unreliable_channel()
225                .add_channel(ChannelConfig {
226                    // UnreliableOrdered
227                    ordered: true,
228                    max_retransmits: Some(0),
229                })
230                .build(),
231        );
232        self.insert_resource(matchbox);
233    }
234}