z4_engine/
engine.rs

1use ethers::prelude::Address;
2use serde_json::{json, Value};
3use std::collections::HashMap;
4use std::sync::Arc;
5use tdn::{
6    prelude::{
7        start_with_config_and_key, NetworkType, PeerId, ReceiveMessage, SendMessage, SendType,
8    },
9    types::{primitives::vec_remove_item, rpc::RpcError},
10};
11use tokio::{
12    select,
13    sync::mpsc::{unbounded_channel, Sender, UnboundedReceiver, UnboundedSender},
14    sync::Mutex,
15};
16use z4_types::{
17    handle_tasks, GameId, HandleResult, Handler, MethodValues, Param, Player, Result, RoomId,
18    TaskMessage,
19};
20
21use crate::{
22    config::Config,
23    p2p::handle_p2p,
24    pool::{listen as pool_listen, pool_channel},
25    room::{ConnectType, Room},
26    rpc::handle_rpc,
27    scan::{chain_channel, listen as scan_listen},
28    ChainMessage, PoolMessage,
29};
30
31/// Store the room info
32pub struct HandlerRoom<H: Handler> {
33    /// Game logic handler
34    pub handler: Arc<Mutex<H>>,
35    /// Game id/address
36    pub game: GameId,
37    /// Room info
38    pub room: Room,
39}
40
41/// Pending room
42pub struct PendingRoom {
43    /// Game id/address
44    game: GameId,
45    /// The room is viewable for others
46    viewable: bool,
47    /// The salt for seed by first player
48    salt: [u8; 32],
49    /// The block info for seed on chain
50    block: [u8; 32],
51    /// Player params: account, peer, pubkey
52    pub players: Vec<Player>,
53    /// Sequencer params: peer, websocket
54    pub sequencer: Option<(PeerId, String)>,
55}
56
57/// Engine
58pub struct Engine<H: Handler> {
59    /// Config of engine and network
60    config: Config,
61    /// Rooms which is running
62    rooms: HashMap<RoomId, HandlerRoom<H>>,
63    /// Rooms which is waiting create, room => (game, players, sequencer)
64    pub pending: HashMap<RoomId, PendingRoom>,
65    /// Supported games and game's pending rooms
66    pub games: HashMap<GameId, Vec<RoomId>>,
67    /// Connected peers
68    onlines: Arc<Mutex<HashMap<PeerId, Vec<RoomId>>>>,
69}
70
71impl<H: Handler> Engine<H> {
72    /// Init a engine with config
73    pub fn init(config: Config) -> Self {
74        let mut games = HashMap::new();
75        for game in config.games.iter() {
76            if let Ok(addr) = game.parse::<Address>() {
77                games.insert(addr, vec![]);
78            }
79        }
80        Self {
81            config,
82            games,
83            rooms: HashMap::new(),
84            pending: HashMap::new(),
85            onlines: Arc::new(Mutex::new(HashMap::new())),
86        }
87    }
88
89    /// Create a pending room when scan from chain
90    pub fn create_pending(
91        &mut self,
92        id: RoomId,
93        game: GameId,
94        viewable: bool,
95        player: Player,
96        salt: [u8; 32],
97        block: [u8; 32],
98    ) {
99        if let Some(games) = self.games.get_mut(&game) {
100            if !self.pending.contains_key(&id) {
101                self.pending.insert(
102                    id,
103                    PendingRoom {
104                        game,
105                        viewable,
106                        salt,
107                        block,
108                        players: vec![player],
109                        sequencer: None,
110                    },
111                );
112                games.push(id);
113            }
114        }
115    }
116
117    /// Join new player to the room
118    pub fn join_pending(&mut self, id: RoomId, player: Player) {
119        if let Some(proom) = self.pending.get_mut(&id) {
120            proom.players.push(player);
121        }
122    }
123
124    /// Create a pending room when scan from chain
125    pub fn del_pending(&mut self, id: RoomId) {
126        if let Some(proom) = self.pending.remove(&id) {
127            self.games
128                .get_mut(&proom.game)
129                .map(|v| vec_remove_item(v, &id));
130        }
131    }
132
133    /// Check if contains pending room
134    pub fn contains_pending(&self, id: &RoomId) -> bool {
135        self.pending.contains_key(id)
136    }
137
138    /// Create a room when scan from chain
139    pub async fn start_room(
140        &mut self,
141        id: RoomId,
142        sequencer: (PeerId, String),
143        params: Vec<u8>,
144        is_self: bool,
145        task_sender: UnboundedSender<TaskMessage<H>>,
146    ) {
147        if let Some(proom) = self.pending.get_mut(&id) {
148            proom.sequencer = Some(sequencer);
149
150            if is_self {
151                let seed: [u8; 32] = proom
152                    .salt
153                    .iter()
154                    .zip(proom.block.iter())
155                    .map(|(&x1, &x2)| x1 ^ x2)
156                    .collect::<Vec<u8>>()
157                    .try_into()
158                    .unwrap_or([0u8; 32]);
159                if let Some((raw_handler, tasks)) =
160                    H::chain_create(&proom.players, params, id, seed).await
161                {
162                    let handler = Arc::new(Mutex::new(raw_handler));
163                    let ids: Vec<PeerId> = proom.players.iter().map(|p| p.peer).collect();
164
165                    // running tasks
166                    if !tasks.is_empty() {
167                        handle_tasks(id, tasks, handler.clone(), task_sender);
168                    }
169
170                    let room = HandlerRoom {
171                        handler: handler,
172                        game: proom.game,
173                        room: Room::new(id, proom.viewable, &ids),
174                    };
175
176                    self.rooms.insert(id, room);
177                }
178            }
179        }
180    }
181
182    /// Over a room
183    pub async fn over_room(&mut self, id: RoomId) {
184        if let Some(_room) = self.rooms.remove(&id) {
185            // TODO clear onlines
186        }
187    }
188
189    /// Check room exists
190    pub fn has_room(&self, id: &RoomId) -> bool {
191        self.rooms.contains_key(id)
192    }
193
194    /// Get room info
195    pub fn get_room(&self, id: &RoomId) -> &HandlerRoom<H> {
196        self.rooms.get(id).unwrap() // safe before check
197    }
198
199    /// Check the player is in the room
200    pub async fn is_room_player(&self, id: &RoomId, peer: &PeerId) -> bool {
201        if let Some(hr) = self.rooms.get(id) {
202            hr.room.is_player(peer)
203        } else {
204            false
205        }
206    }
207
208    /// Check the player is in some rooms that hold by this node
209    pub async fn has_peer(&self, peer: &PeerId) -> bool {
210        if let Some(rooms) = self.onlines.lock().await.get(&peer) {
211            !rooms.is_empty()
212        } else {
213            false
214        }
215    }
216
217    /// When a player online/connected
218    pub async fn online(&mut self, id: RoomId, peer: PeerId, ctype: ConnectType) -> bool {
219        let is_ok = if let Some(hr) = self.rooms.get_mut(&id) {
220            hr.room.online(peer, ctype)
221        } else {
222            false
223        };
224
225        if is_ok {
226            let mut onlines_lock = self.onlines.lock().await;
227            onlines_lock
228                .entry(peer)
229                .and_modify(|rooms| {
230                    if !rooms.contains(&id) {
231                        rooms.push(id)
232                    }
233                })
234                .or_insert(vec![id]);
235        }
236
237        is_ok
238    }
239
240    /// When a player offline/disconnected
241    pub async fn offline(&mut self, peer: PeerId) {
242        let mut onlines_lock = self.onlines.lock().await;
243        if let Some(rooms) = onlines_lock.remove(&peer) {
244            for rid in rooms {
245                if let Some(hr) = self.rooms.get_mut(&rid) {
246                    hr.room.offline(peer);
247                }
248            }
249        }
250    }
251
252    /// Run the engine with game logic
253    pub async fn run(self) -> Result<()> {
254        let (chain_send, chain_recv) = chain_channel();
255        self.run_with_channel(chain_send, chain_recv).await
256    }
257
258    /// Run the engine with game logic and channel
259    pub async fn run_with_channel(
260        mut self,
261        chain_send: UnboundedSender<ChainMessage>,
262        mut chain_recv: UnboundedReceiver<ChainMessage>,
263    ) -> Result<()> {
264        let (tdn_config, key) = self.config.to_tdn();
265        let chain_option = self.config.to_chain().await;
266
267        let (peer_addr, send, mut out_recv) = start_with_config_and_key(tdn_config, key).await?;
268        println!("SERVER: peer id: {:?}", peer_addr);
269        println!("P2P   : http://0.0.0.0:{}", self.config.p2p_port);
270        println!("HTTP  : http://0.0.0.0:{}", self.config.http_port);
271        if let Some(p) = self.config.ws_port {
272            println!("WS    : ws://0.0.0.0:{}", p);
273        }
274
275        let (pool_send, pool_recv) = pool_channel();
276        if let Some((scan_providers, pool_provider, market_address, start_block)) = chain_option {
277            let send1 = chain_send.clone();
278            let send2 = chain_send.clone();
279            tokio::spawn(scan_listen(
280                scan_providers,
281                market_address,
282                send1,
283                start_block,
284            ));
285            tokio::spawn(pool_listen(pool_provider, market_address, send2, pool_recv));
286        }
287
288        let (task_sender, mut task_receiver) = unbounded_channel();
289        loop {
290            let work = select! {
291                w = async {
292                    chain_recv.recv().await.map(FutureMessage::Chain)
293                } => w,
294                w = async {
295                    out_recv.recv().await.map(FutureMessage::Network)
296                } => w,
297                w = async {
298                    task_receiver.recv().await.map(FutureMessage::Task)
299                } => w,
300            };
301
302            match work {
303                Some(FutureMessage::Task(message)) => match message {
304                    TaskMessage::Result(rid, res) => {
305                        let is_over = res.over;
306                        handle_result(&self.get_room(&rid).room, res, &send, None, 0).await;
307                        if is_over {
308                            handle_over(
309                                rid,
310                                self.get_room(&rid).handler.clone(),
311                                chain_send.clone(),
312                            );
313                        }
314                    }
315                },
316                Some(FutureMessage::Network(message)) => match message {
317                    ReceiveMessage::Group(rid, msg) => {
318                        if !self.has_room(&rid) {
319                            continue;
320                        }
321                        if let Ok(Some(res)) = handle_p2p(&mut self, &send, rid, msg).await {
322                            let is_over = res.over;
323                            handle_result(&self.get_room(&rid).room, res, &send, None, 0).await;
324                            if is_over {
325                                handle_over(
326                                    rid,
327                                    self.get_room(&rid).handler.clone(),
328                                    chain_send.clone(),
329                                );
330                            }
331                        }
332                    }
333                    ReceiveMessage::Rpc(uid, params, is_ws) => {
334                        match handle_rpc(&mut self, &send, uid, params, is_ws).await {
335                            Ok(Some((res, rid, is_rpc, id))) => {
336                                let is_over = res.over;
337                                handle_result(&self.get_room(&rid).room, res, &send, is_rpc, id)
338                                    .await;
339                                if is_over {
340                                    handle_over(
341                                        rid,
342                                        self.get_room(&rid).handler.clone(),
343                                        chain_send.clone(),
344                                    );
345                                }
346                            }
347                            Ok(None) => {
348                                let msg = RpcError::Custom("None".to_owned()).json(0);
349                                let _ = send.send(SendMessage::Rpc(uid, msg, is_ws)).await;
350                            }
351                            Err(err) => {
352                                let msg = RpcError::Custom(format!("{:?}", err)).json(0);
353                                let _ = send.send(SendMessage::Rpc(uid, msg, is_ws)).await;
354                            }
355                        }
356                    }
357                    ReceiveMessage::NetworkLost => {
358                        debug!("No network connections");
359                    }
360                    ReceiveMessage::Own(..) => {}
361                },
362                Some(FutureMessage::Chain(message)) => match message {
363                    ChainMessage::CreateRoom(
364                        rid,
365                        game,
366                        viewable,
367                        account,
368                        peer,
369                        signer,
370                        salt,
371                        block,
372                    ) => {
373                        info!("Engine: chain new room created !");
374                        self.create_pending(
375                            rid,
376                            game,
377                            viewable,
378                            Player {
379                                account,
380                                peer,
381                                signer,
382                            },
383                            salt,
384                            block,
385                        );
386                    }
387                    ChainMessage::JoinRoom(rid, account, peer, signer) => {
388                        info!("Engine: chain new player joined !");
389                        self.join_pending(
390                            rid,
391                            Player {
392                                account,
393                                peer,
394                                signer,
395                            },
396                        );
397                    }
398                    ChainMessage::StartRoom(rid, game) => {
399                        // send accept operation to chain
400                        // check room is exist
401                        if let Some(proom) = self.pending.get(&rid) {
402                            let params = H::chain_accept(&proom.players).await;
403                            let _ = pool_send.send(PoolMessage::AcceptRoom(rid, params));
404                        } else if self.games.contains_key(&game) {
405                            // TODO fetch room from chain.
406                        }
407                    }
408                    ChainMessage::AcceptRoom(rid, sequencer, ws, params) => {
409                        info!("Engine: start new room: {}", rid);
410                        // if mine, create room
411                        let is_own = sequencer == peer_addr;
412                        self.start_room(rid, (sequencer, ws), params, is_own, task_sender.clone())
413                            .await;
414
415                        if is_own {
416                            let _ = send
417                                .send(SendMessage::Network(NetworkType::AddGroup(rid)))
418                                .await;
419                        }
420                    }
421                    ChainMessage::GameOverRoom(gid, data, proof) => {
422                        let _ = pool_send.send(PoolMessage::OverRoom(gid, data, proof));
423                        self.over_room(gid).await;
424                    }
425                    ChainMessage::ChainOverRoom(gid) => {
426                        self.del_pending(gid);
427                    }
428                    ChainMessage::Reprove => {
429                        // TODO logic
430                    }
431                },
432                None => break,
433            }
434        }
435
436        Ok(())
437    }
438}
439
440enum FutureMessage<H: Handler> {
441    Network(ReceiveMessage),
442    Chain(ChainMessage),
443    Task(TaskMessage<H>),
444}
445
446/// Handle result
447async fn handle_result<P: Param>(
448    room: &Room,
449    result: HandleResult<P>,
450    send: &Sender<SendMessage>,
451    rpc: Option<(PeerId, u64)>,
452    id: u64,
453) {
454    let HandleResult {
455        mut all,
456        mut one,
457        over,
458        started: _,
459    } = result;
460
461    loop {
462        if !one.is_empty() {
463            let (peer, params) = one.remove(0);
464            let p2p_bytes = params.to_bytes();
465            let rpc_msg = build_rpc_response(id, room.id, params.to_value());
466            match room.get(&peer) {
467                ConnectType::P2p => send
468                    .send(SendMessage::Group(
469                        room.id,
470                        SendType::Event(0, peer, p2p_bytes),
471                    ))
472                    .await
473                    .expect("TDN channel closed"),
474                ConnectType::Rpc(uid) => send
475                    .send(SendMessage::Rpc(uid, rpc_msg, true))
476                    .await
477                    .expect("TDN channel closed"),
478                ConnectType::None => {
479                    if let Some((p, uid)) = rpc {
480                        if p == peer {
481                            send.send(SendMessage::Rpc(uid, rpc_msg, false))
482                                .await
483                                .expect("TDN channel closed");
484                        }
485                    }
486                }
487            }
488        } else {
489            break;
490        }
491    }
492
493    loop {
494        if !all.is_empty() {
495            let params = all.remove(0);
496            let p2p_bytes = params.to_bytes();
497            let rpc_msg = build_rpc_response(id, room.id, params.to_value());
498            for (peer, c) in room.iter() {
499                match c {
500                    ConnectType::P2p => {
501                        send.send(SendMessage::Group(
502                            room.id,
503                            SendType::Event(0, *peer, p2p_bytes.clone()),
504                        ))
505                        .await
506                        .expect("TDN channel closed");
507                    }
508                    ConnectType::Rpc(uid) => {
509                        send.send(SendMessage::Rpc(*uid, rpc_msg.clone(), true))
510                            .await
511                            .expect("TDN channel closed");
512                    }
513                    ConnectType::None => {
514                        if let Some((p, uid)) = rpc {
515                            if p == *peer {
516                                send.send(SendMessage::Rpc(uid, rpc_msg.clone(), false))
517                                    .await
518                                    .expect("TDN channel closed");
519                            }
520                        }
521                    }
522                }
523            }
524            // TODO
525        } else {
526            break;
527        }
528    }
529
530    if over {
531        let params = MethodValues {
532            method: "over".to_owned(),
533            params: vec![],
534        };
535        let p2p_bytes = params.to_bytes();
536        let rpc_msg = build_rpc_response(id, room.id, params.to_value());
537        for (peer, c) in room.iter() {
538            match c {
539                ConnectType::P2p => send
540                    .send(SendMessage::Group(
541                        room.id,
542                        SendType::Event(0, *peer, p2p_bytes.clone()),
543                    ))
544                    .await
545                    .expect("TDN channel closed"),
546                ConnectType::Rpc(uid) => {
547                    send.send(SendMessage::Rpc(*uid, rpc_msg.clone(), true))
548                        .await
549                        .expect("TDN channel closed");
550                }
551                ConnectType::None => {
552                    if let Some((p, uid)) = rpc {
553                        if p == *peer {
554                            send.send(SendMessage::Rpc(uid, rpc_msg.clone(), false))
555                                .await
556                                .expect("TDN channel closed");
557                        }
558                    }
559                }
560            }
561        }
562    }
563}
564
565fn handle_over<H: Handler>(
566    rid: RoomId,
567    handler: Arc<Mutex<H>>,
568    chain_send: UnboundedSender<ChainMessage>,
569) {
570    tokio::spawn(async move {
571        let mut lock = handler.lock().await;
572        if let Ok((data, proof)) = lock.prove().await {
573            let _ = chain_send.send(ChainMessage::GameOverRoom(rid, data, proof));
574        }
575    });
576}
577
578fn build_rpc_response(id: u64, gid: RoomId, params: Value) -> Value {
579    match (
580        params.get("method"),
581        params.get("result"),
582        params.get("params"),
583    ) {
584        (Some(method), Some(result), _) => {
585            json!({
586                "jsonrpc": "2.0",
587                "id": id,
588                "gid": gid,
589                "method": method,
590                "result": result,
591            })
592        }
593        (Some(method), None, Some(result)) => {
594            json!({
595                "jsonrpc": "2.0",
596                "id": id,
597                "gid": gid,
598                "method": method,
599                "result": result,
600            })
601        }
602        _ => {
603            json!({
604                "jsonrpc": "2.0",
605                "id": id,
606                "gid": gid,
607                "result": params
608            })
609        }
610    }
611}