z4_engine/
request.rs

1use futures_util::{SinkExt, StreamExt};
2use std::path::PathBuf;
3use tdn::prelude::{
4    start_with_config_and_key, Config as TdnConfig, NetworkType, Peer, PeerKey, ReceiveMessage,
5    RecvType, SendMessage, SendType,
6};
7use tokio::{
8    net::TcpStream,
9    sync::mpsc::{unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender},
10};
11use tokio_tungstenite::{
12    connect_async,
13    tungstenite::{client::IntoClientRequest, protocol::Message},
14    MaybeTlsStream, WebSocketStream,
15};
16use z4_types::{json, merge_json, Param, Result, RoomId, Value};
17
18/// Channel message
19pub type ChannelMessage<P> = (RoomId, P);
20
21/// Create a channel
22#[inline]
23pub fn message_channel<P: Param>() -> (
24    UnboundedSender<ChannelMessage<P>>,
25    UnboundedReceiver<ChannelMessage<P>>,
26) {
27    unbounded_channel()
28}
29
30/// Running a ws channel
31pub async fn run_ws_channel<P: 'static + Param>(
32    peer: &PeerKey,
33    room: RoomId,
34    in_recv: UnboundedReceiver<ChannelMessage<P>>,
35    url: impl IntoClientRequest + Unpin,
36) -> Result<UnboundedReceiver<ChannelMessage<P>>> {
37    let (out_send, out_recv) = unbounded_channel();
38    let (ws_stream, _) = connect_async(url).await.expect("Failed to connect"); // TODO
39
40    let peer = PeerKey::from_db_bytes(&peer.to_db_bytes()).unwrap(); // safe
41    tokio::spawn(ws_listen(peer, room, out_send, in_recv, ws_stream));
42    Ok(out_recv)
43}
44
45/// Running a p2p channel
46pub async fn run_p2p_channel<P: 'static + Param>(
47    peer: &PeerKey,
48    room: RoomId,
49    in_recv: UnboundedReceiver<ChannelMessage<P>>,
50    server: Peer,
51) -> Result<UnboundedReceiver<ChannelMessage<P>>> {
52    let (out_send, out_recv) = unbounded_channel();
53    let peer = PeerKey::from_db_bytes(&peer.to_db_bytes()).unwrap(); // safe
54
55    // Running P2P network
56    let mut config = TdnConfig::default();
57    config.db_path = Some(PathBuf::from(&format!("./.tdn/{:?}", peer.peer_id())));
58    config.rpc_ws = None;
59    config.rpc_http = None;
60    config.p2p_peer = Peer::socket("0.0.0.0:0".parse().unwrap()); // safe
61    let (_, p2p_send, p2p_recv) = start_with_config_and_key(config, peer).await?;
62
63    tokio::spawn(p2p_listen(
64        server, room, out_send, in_recv, p2p_send, p2p_recv,
65    ));
66    Ok(out_recv)
67}
68
69enum WsResult<P: Param> {
70    Out(ChannelMessage<P>),
71    Stream(Message),
72}
73
74#[inline]
75fn build_request(params: Value, room: RoomId, peer: &PeerKey) -> Value {
76    let mut request = json!({
77        "jsonrpc": "2.0",
78        "id": 0,
79        "gid": room,
80        "peer": peer.peer_id().to_hex(),
81    });
82
83    merge_json(&mut request, &params);
84    request
85}
86
87async fn ws_listen<P: Param>(
88    peer: PeerKey,
89    room: RoomId,
90    send: UnboundedSender<ChannelMessage<P>>,
91    mut in_recv: UnboundedReceiver<ChannelMessage<P>>,
92    ws_stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
93) {
94    let (mut writer, mut reader) = ws_stream.split();
95
96    // send connect
97    let request = build_request(
98        json!({
99            "method": "connect",
100            "params": [],
101        }),
102        room,
103        &peer,
104    );
105    let s = Message::from(serde_json::to_string(&request).unwrap_or("".to_owned()));
106    let _ = writer.send(s).await;
107
108    loop {
109        let res = tokio::select! {
110            v = async { in_recv.recv().await.map(|msg| WsResult::Out(msg)) } => v,
111            v = async {
112                reader
113                    .next()
114                    .await
115                    .map(|msg| msg.map(|msg| WsResult::Stream(msg)).ok())
116                    .flatten()
117            } => v,
118        };
119
120        match res {
121            Some(WsResult::Out((room, params))) => {
122                let request = build_request(params.to_value(), room, &peer);
123                let s = Message::from(serde_json::to_string(&request).unwrap_or("".to_owned()));
124                let _ = writer.send(s).await;
125            }
126            Some(WsResult::Stream(msg)) => {
127                let msg = msg.to_text().unwrap_or("");
128                match serde_json::from_str::<Value>(&msg) {
129                    Ok(mut values) => {
130                        let gid = values["gid"].as_u64().unwrap_or(0);
131                        let method = values["method"].as_str().unwrap_or("").to_owned();
132                        let mut params = values["result"].take();
133                        merge_json(
134                            &mut params,
135                            &json!({
136                                "method": method
137                            }),
138                        );
139
140                        match P::from_value(params) {
141                            Ok(p) => {
142                                let _ = send.send((gid, p));
143                            }
144                            _ => {}
145                        }
146                    }
147                    Err(_e) => {}
148                }
149            }
150            None => break,
151        }
152    }
153}
154
155enum P2pResult<P: Param> {
156    Out(ChannelMessage<P>),
157    Stream(ReceiveMessage),
158}
159
160async fn p2p_listen<P: Param>(
161    server: Peer,
162    room: RoomId,
163    send: UnboundedSender<ChannelMessage<P>>,
164    mut in_recv: UnboundedReceiver<ChannelMessage<P>>,
165    p2p_send: Sender<SendMessage>,
166    mut p2p_recv: Receiver<ReceiveMessage>,
167) {
168    let server_id = server.id;
169    // add room to network
170    let _ = p2p_send
171        .send(SendMessage::Network(NetworkType::AddGroup(room)))
172        .await;
173    // create connection to peer socket
174    let _ = p2p_send
175        .send(SendMessage::Network(NetworkType::Connect(Peer::socket(
176            server.socket,
177        ))))
178        .await;
179    // create stable connection to peer
180    tokio::time::sleep(std::time::Duration::from_secs(2)).await;
181    let _ = p2p_send
182        .send(SendMessage::Group(
183            room,
184            SendType::Connect(0, Peer::peer(server.id), vec![]),
185        ))
186        .await;
187
188    loop {
189        let res = tokio::select! {
190            v = async { in_recv.recv().await.map(|msg| P2pResult::Out(msg)) } => v,
191            v = async {
192                p2p_recv
193                    .recv()
194                    .await
195                    .map(|msg| P2pResult::Stream(msg))
196            } => v,
197        };
198
199        match res {
200            Some(P2pResult::Out((room, params))) => {
201                let _ = p2p_send
202                    .send(SendMessage::Group(
203                        room,
204                        SendType::Event(0, server_id, params.to_bytes()),
205                    ))
206                    .await;
207            }
208            Some(P2pResult::Stream(message)) => match message {
209                ReceiveMessage::Group(gid, msg) => match msg {
210                    RecvType::Event(peer, msg) => {
211                        if peer == server_id {
212                            match Param::from_bytes(msg) {
213                                Ok(p) => {
214                                    let _ = send.send((gid, p));
215                                }
216                                _ => {}
217                            }
218                        }
219                    }
220                    _ => {}
221                },
222                _ => {}
223            },
224            None => break,
225        }
226    }
227}