1use ethers::prelude::Address;
2use serde_json::{json, Value};
3use std::collections::HashMap;
4use std::sync::Arc;
5use tdn::{
6 prelude::{
7 start_with_config_and_key, NetworkType, PeerId, ReceiveMessage, SendMessage, SendType,
8 },
9 types::{primitives::vec_remove_item, rpc::RpcError},
10};
11use tokio::{
12 select,
13 sync::mpsc::{unbounded_channel, Sender, UnboundedReceiver, UnboundedSender},
14 sync::Mutex,
15};
16use z4_types::{
17 handle_tasks, GameId, HandleResult, Handler, MethodValues, Param, Player, Result, RoomId,
18 TaskMessage,
19};
20
21use crate::{
22 config::Config,
23 p2p::handle_p2p,
24 pool::{listen as pool_listen, pool_channel},
25 room::{ConnectType, Room},
26 rpc::handle_rpc,
27 scan::{chain_channel, listen as scan_listen},
28 ChainMessage, PoolMessage,
29};
30
31pub struct HandlerRoom<H: Handler> {
33 pub handler: Arc<Mutex<H>>,
35 pub game: GameId,
37 pub room: Room,
39}
40
41pub struct PendingRoom {
43 game: GameId,
45 viewable: bool,
47 salt: [u8; 32],
49 block: [u8; 32],
51 pub players: Vec<Player>,
53 pub sequencer: Option<(PeerId, String)>,
55}
56
57pub struct Engine<H: Handler> {
59 config: Config,
61 rooms: HashMap<RoomId, HandlerRoom<H>>,
63 pub pending: HashMap<RoomId, PendingRoom>,
65 pub games: HashMap<GameId, Vec<RoomId>>,
67 onlines: Arc<Mutex<HashMap<PeerId, Vec<RoomId>>>>,
69}
70
71impl<H: Handler> Engine<H> {
72 pub fn init(config: Config) -> Self {
74 let mut games = HashMap::new();
75 for game in config.games.iter() {
76 if let Ok(addr) = game.parse::<Address>() {
77 games.insert(addr, vec![]);
78 }
79 }
80 Self {
81 config,
82 games,
83 rooms: HashMap::new(),
84 pending: HashMap::new(),
85 onlines: Arc::new(Mutex::new(HashMap::new())),
86 }
87 }
88
89 pub fn create_pending(
91 &mut self,
92 id: RoomId,
93 game: GameId,
94 viewable: bool,
95 player: Player,
96 salt: [u8; 32],
97 block: [u8; 32],
98 ) {
99 if let Some(games) = self.games.get_mut(&game) {
100 if !self.pending.contains_key(&id) {
101 self.pending.insert(
102 id,
103 PendingRoom {
104 game,
105 viewable,
106 salt,
107 block,
108 players: vec![player],
109 sequencer: None,
110 },
111 );
112 games.push(id);
113 }
114 }
115 }
116
117 pub fn join_pending(&mut self, id: RoomId, player: Player) {
119 if let Some(proom) = self.pending.get_mut(&id) {
120 proom.players.push(player);
121 }
122 }
123
124 pub fn del_pending(&mut self, id: RoomId) {
126 if let Some(proom) = self.pending.remove(&id) {
127 self.games
128 .get_mut(&proom.game)
129 .map(|v| vec_remove_item(v, &id));
130 }
131 }
132
133 pub fn contains_pending(&self, id: &RoomId) -> bool {
135 self.pending.contains_key(id)
136 }
137
138 pub async fn start_room(
140 &mut self,
141 id: RoomId,
142 sequencer: (PeerId, String),
143 params: Vec<u8>,
144 is_self: bool,
145 task_sender: UnboundedSender<TaskMessage<H>>,
146 ) {
147 if let Some(proom) = self.pending.get_mut(&id) {
148 proom.sequencer = Some(sequencer);
149
150 if is_self {
151 let seed: [u8; 32] = proom
152 .salt
153 .iter()
154 .zip(proom.block.iter())
155 .map(|(&x1, &x2)| x1 ^ x2)
156 .collect::<Vec<u8>>()
157 .try_into()
158 .unwrap_or([0u8; 32]);
159 if let Some((raw_handler, tasks)) =
160 H::chain_create(&proom.players, params, id, seed).await
161 {
162 let handler = Arc::new(Mutex::new(raw_handler));
163 let ids: Vec<PeerId> = proom.players.iter().map(|p| p.peer).collect();
164
165 if !tasks.is_empty() {
167 handle_tasks(id, tasks, handler.clone(), task_sender);
168 }
169
170 let room = HandlerRoom {
171 handler: handler,
172 game: proom.game,
173 room: Room::new(id, proom.viewable, &ids),
174 };
175
176 self.rooms.insert(id, room);
177 }
178 }
179 }
180 }
181
182 pub async fn over_room(&mut self, id: RoomId) {
184 if let Some(_room) = self.rooms.remove(&id) {
185 }
187 }
188
189 pub fn has_room(&self, id: &RoomId) -> bool {
191 self.rooms.contains_key(id)
192 }
193
194 pub fn get_room(&self, id: &RoomId) -> &HandlerRoom<H> {
196 self.rooms.get(id).unwrap() }
198
199 pub async fn is_room_player(&self, id: &RoomId, peer: &PeerId) -> bool {
201 if let Some(hr) = self.rooms.get(id) {
202 hr.room.is_player(peer)
203 } else {
204 false
205 }
206 }
207
208 pub async fn has_peer(&self, peer: &PeerId) -> bool {
210 if let Some(rooms) = self.onlines.lock().await.get(&peer) {
211 !rooms.is_empty()
212 } else {
213 false
214 }
215 }
216
217 pub async fn online(&mut self, id: RoomId, peer: PeerId, ctype: ConnectType) -> bool {
219 let is_ok = if let Some(hr) = self.rooms.get_mut(&id) {
220 hr.room.online(peer, ctype)
221 } else {
222 false
223 };
224
225 if is_ok {
226 let mut onlines_lock = self.onlines.lock().await;
227 onlines_lock
228 .entry(peer)
229 .and_modify(|rooms| {
230 if !rooms.contains(&id) {
231 rooms.push(id)
232 }
233 })
234 .or_insert(vec![id]);
235 }
236
237 is_ok
238 }
239
240 pub async fn offline(&mut self, peer: PeerId) {
242 let mut onlines_lock = self.onlines.lock().await;
243 if let Some(rooms) = onlines_lock.remove(&peer) {
244 for rid in rooms {
245 if let Some(hr) = self.rooms.get_mut(&rid) {
246 hr.room.offline(peer);
247 }
248 }
249 }
250 }
251
252 pub async fn run(self) -> Result<()> {
254 let (chain_send, chain_recv) = chain_channel();
255 self.run_with_channel(chain_send, chain_recv).await
256 }
257
258 pub async fn run_with_channel(
260 mut self,
261 chain_send: UnboundedSender<ChainMessage>,
262 mut chain_recv: UnboundedReceiver<ChainMessage>,
263 ) -> Result<()> {
264 let (tdn_config, key) = self.config.to_tdn();
265 let chain_option = self.config.to_chain().await;
266
267 let (peer_addr, send, mut out_recv) = start_with_config_and_key(tdn_config, key).await?;
268 println!("SERVER: peer id: {:?}", peer_addr);
269 println!("P2P : http://0.0.0.0:{}", self.config.p2p_port);
270 println!("HTTP : http://0.0.0.0:{}", self.config.http_port);
271 if let Some(p) = self.config.ws_port {
272 println!("WS : ws://0.0.0.0:{}", p);
273 }
274
275 let (pool_send, pool_recv) = pool_channel();
276 if let Some((scan_providers, pool_provider, market_address, start_block)) = chain_option {
277 let send1 = chain_send.clone();
278 let send2 = chain_send.clone();
279 tokio::spawn(scan_listen(
280 scan_providers,
281 market_address,
282 send1,
283 start_block,
284 ));
285 tokio::spawn(pool_listen(pool_provider, market_address, send2, pool_recv));
286 }
287
288 let (task_sender, mut task_receiver) = unbounded_channel();
289 loop {
290 let work = select! {
291 w = async {
292 chain_recv.recv().await.map(FutureMessage::Chain)
293 } => w,
294 w = async {
295 out_recv.recv().await.map(FutureMessage::Network)
296 } => w,
297 w = async {
298 task_receiver.recv().await.map(FutureMessage::Task)
299 } => w,
300 };
301
302 match work {
303 Some(FutureMessage::Task(message)) => match message {
304 TaskMessage::Result(rid, res) => {
305 let is_over = res.over;
306 handle_result(&self.get_room(&rid).room, res, &send, None, 0).await;
307 if is_over {
308 handle_over(
309 rid,
310 self.get_room(&rid).handler.clone(),
311 chain_send.clone(),
312 );
313 }
314 }
315 },
316 Some(FutureMessage::Network(message)) => match message {
317 ReceiveMessage::Group(rid, msg) => {
318 if !self.has_room(&rid) {
319 continue;
320 }
321 if let Ok(Some(res)) = handle_p2p(&mut self, &send, rid, msg).await {
322 let is_over = res.over;
323 handle_result(&self.get_room(&rid).room, res, &send, None, 0).await;
324 if is_over {
325 handle_over(
326 rid,
327 self.get_room(&rid).handler.clone(),
328 chain_send.clone(),
329 );
330 }
331 }
332 }
333 ReceiveMessage::Rpc(uid, params, is_ws) => {
334 match handle_rpc(&mut self, &send, uid, params, is_ws).await {
335 Ok(Some((res, rid, is_rpc, id))) => {
336 let is_over = res.over;
337 handle_result(&self.get_room(&rid).room, res, &send, is_rpc, id)
338 .await;
339 if is_over {
340 handle_over(
341 rid,
342 self.get_room(&rid).handler.clone(),
343 chain_send.clone(),
344 );
345 }
346 }
347 Ok(None) => {
348 let msg = RpcError::Custom("None".to_owned()).json(0);
349 let _ = send.send(SendMessage::Rpc(uid, msg, is_ws)).await;
350 }
351 Err(err) => {
352 let msg = RpcError::Custom(format!("{:?}", err)).json(0);
353 let _ = send.send(SendMessage::Rpc(uid, msg, is_ws)).await;
354 }
355 }
356 }
357 ReceiveMessage::NetworkLost => {
358 debug!("No network connections");
359 }
360 ReceiveMessage::Own(..) => {}
361 },
362 Some(FutureMessage::Chain(message)) => match message {
363 ChainMessage::CreateRoom(
364 rid,
365 game,
366 viewable,
367 account,
368 peer,
369 signer,
370 salt,
371 block,
372 ) => {
373 info!("Engine: chain new room created !");
374 self.create_pending(
375 rid,
376 game,
377 viewable,
378 Player {
379 account,
380 peer,
381 signer,
382 },
383 salt,
384 block,
385 );
386 }
387 ChainMessage::JoinRoom(rid, account, peer, signer) => {
388 info!("Engine: chain new player joined !");
389 self.join_pending(
390 rid,
391 Player {
392 account,
393 peer,
394 signer,
395 },
396 );
397 }
398 ChainMessage::StartRoom(rid, game) => {
399 if let Some(proom) = self.pending.get(&rid) {
402 let params = H::chain_accept(&proom.players).await;
403 let _ = pool_send.send(PoolMessage::AcceptRoom(rid, params));
404 } else if self.games.contains_key(&game) {
405 }
407 }
408 ChainMessage::AcceptRoom(rid, sequencer, ws, params) => {
409 info!("Engine: start new room: {}", rid);
410 let is_own = sequencer == peer_addr;
412 self.start_room(rid, (sequencer, ws), params, is_own, task_sender.clone())
413 .await;
414
415 if is_own {
416 let _ = send
417 .send(SendMessage::Network(NetworkType::AddGroup(rid)))
418 .await;
419 }
420 }
421 ChainMessage::GameOverRoom(gid, data, proof) => {
422 let _ = pool_send.send(PoolMessage::OverRoom(gid, data, proof));
423 self.over_room(gid).await;
424 }
425 ChainMessage::ChainOverRoom(gid) => {
426 self.del_pending(gid);
427 }
428 ChainMessage::Reprove => {
429 }
431 },
432 None => break,
433 }
434 }
435
436 Ok(())
437 }
438}
439
440enum FutureMessage<H: Handler> {
441 Network(ReceiveMessage),
442 Chain(ChainMessage),
443 Task(TaskMessage<H>),
444}
445
446async fn handle_result<P: Param>(
448 room: &Room,
449 result: HandleResult<P>,
450 send: &Sender<SendMessage>,
451 rpc: Option<(PeerId, u64)>,
452 id: u64,
453) {
454 let HandleResult {
455 mut all,
456 mut one,
457 over,
458 started: _,
459 } = result;
460
461 loop {
462 if !one.is_empty() {
463 let (peer, params) = one.remove(0);
464 let p2p_bytes = params.to_bytes();
465 let rpc_msg = build_rpc_response(id, room.id, params.to_value());
466 match room.get(&peer) {
467 ConnectType::P2p => send
468 .send(SendMessage::Group(
469 room.id,
470 SendType::Event(0, peer, p2p_bytes),
471 ))
472 .await
473 .expect("TDN channel closed"),
474 ConnectType::Rpc(uid) => send
475 .send(SendMessage::Rpc(uid, rpc_msg, true))
476 .await
477 .expect("TDN channel closed"),
478 ConnectType::None => {
479 if let Some((p, uid)) = rpc {
480 if p == peer {
481 send.send(SendMessage::Rpc(uid, rpc_msg, false))
482 .await
483 .expect("TDN channel closed");
484 }
485 }
486 }
487 }
488 } else {
489 break;
490 }
491 }
492
493 loop {
494 if !all.is_empty() {
495 let params = all.remove(0);
496 let p2p_bytes = params.to_bytes();
497 let rpc_msg = build_rpc_response(id, room.id, params.to_value());
498 for (peer, c) in room.iter() {
499 match c {
500 ConnectType::P2p => {
501 send.send(SendMessage::Group(
502 room.id,
503 SendType::Event(0, *peer, p2p_bytes.clone()),
504 ))
505 .await
506 .expect("TDN channel closed");
507 }
508 ConnectType::Rpc(uid) => {
509 send.send(SendMessage::Rpc(*uid, rpc_msg.clone(), true))
510 .await
511 .expect("TDN channel closed");
512 }
513 ConnectType::None => {
514 if let Some((p, uid)) = rpc {
515 if p == *peer {
516 send.send(SendMessage::Rpc(uid, rpc_msg.clone(), false))
517 .await
518 .expect("TDN channel closed");
519 }
520 }
521 }
522 }
523 }
524 } else {
526 break;
527 }
528 }
529
530 if over {
531 let params = MethodValues {
532 method: "over".to_owned(),
533 params: vec![],
534 };
535 let p2p_bytes = params.to_bytes();
536 let rpc_msg = build_rpc_response(id, room.id, params.to_value());
537 for (peer, c) in room.iter() {
538 match c {
539 ConnectType::P2p => send
540 .send(SendMessage::Group(
541 room.id,
542 SendType::Event(0, *peer, p2p_bytes.clone()),
543 ))
544 .await
545 .expect("TDN channel closed"),
546 ConnectType::Rpc(uid) => {
547 send.send(SendMessage::Rpc(*uid, rpc_msg.clone(), true))
548 .await
549 .expect("TDN channel closed");
550 }
551 ConnectType::None => {
552 if let Some((p, uid)) = rpc {
553 if p == *peer {
554 send.send(SendMessage::Rpc(uid, rpc_msg.clone(), false))
555 .await
556 .expect("TDN channel closed");
557 }
558 }
559 }
560 }
561 }
562 }
563}
564
565fn handle_over<H: Handler>(
566 rid: RoomId,
567 handler: Arc<Mutex<H>>,
568 chain_send: UnboundedSender<ChainMessage>,
569) {
570 tokio::spawn(async move {
571 let mut lock = handler.lock().await;
572 if let Ok((data, proof)) = lock.prove().await {
573 let _ = chain_send.send(ChainMessage::GameOverRoom(rid, data, proof));
574 }
575 });
576}
577
578fn build_rpc_response(id: u64, gid: RoomId, params: Value) -> Value {
579 match (
580 params.get("method"),
581 params.get("result"),
582 params.get("params"),
583 ) {
584 (Some(method), Some(result), _) => {
585 json!({
586 "jsonrpc": "2.0",
587 "id": id,
588 "gid": gid,
589 "method": method,
590 "result": result,
591 })
592 }
593 (Some(method), None, Some(result)) => {
594 json!({
595 "jsonrpc": "2.0",
596 "id": id,
597 "gid": gid,
598 "method": method,
599 "result": result,
600 })
601 }
602 _ => {
603 json!({
604 "jsonrpc": "2.0",
605 "id": id,
606 "gid": gid,
607 "result": params
608 })
609 }
610 }
611}