1use futures_util::{SinkExt, StreamExt};
2use std::path::PathBuf;
3use tdn::prelude::{
4 start_with_config_and_key, Config as TdnConfig, NetworkType, Peer, PeerKey, ReceiveMessage,
5 RecvType, SendMessage, SendType,
6};
7use tokio::{
8 net::TcpStream,
9 sync::mpsc::{unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender},
10};
11use tokio_tungstenite::{
12 connect_async,
13 tungstenite::{client::IntoClientRequest, protocol::Message},
14 MaybeTlsStream, WebSocketStream,
15};
16use z4_types::{json, merge_json, Param, Result, RoomId, Value};
17
18pub type ChannelMessage<P> = (RoomId, P);
20
21#[inline]
23pub fn message_channel<P: Param>() -> (
24 UnboundedSender<ChannelMessage<P>>,
25 UnboundedReceiver<ChannelMessage<P>>,
26) {
27 unbounded_channel()
28}
29
30pub async fn run_ws_channel<P: 'static + Param>(
32 peer: &PeerKey,
33 room: RoomId,
34 in_recv: UnboundedReceiver<ChannelMessage<P>>,
35 url: impl IntoClientRequest + Unpin,
36) -> Result<UnboundedReceiver<ChannelMessage<P>>> {
37 let (out_send, out_recv) = unbounded_channel();
38 let (ws_stream, _) = connect_async(url).await.expect("Failed to connect"); let peer = PeerKey::from_db_bytes(&peer.to_db_bytes()).unwrap(); tokio::spawn(ws_listen(peer, room, out_send, in_recv, ws_stream));
42 Ok(out_recv)
43}
44
45pub async fn run_p2p_channel<P: 'static + Param>(
47 peer: &PeerKey,
48 room: RoomId,
49 in_recv: UnboundedReceiver<ChannelMessage<P>>,
50 server: Peer,
51) -> Result<UnboundedReceiver<ChannelMessage<P>>> {
52 let (out_send, out_recv) = unbounded_channel();
53 let peer = PeerKey::from_db_bytes(&peer.to_db_bytes()).unwrap(); let mut config = TdnConfig::default();
57 config.db_path = Some(PathBuf::from(&format!("./.tdn/{:?}", peer.peer_id())));
58 config.rpc_ws = None;
59 config.rpc_http = None;
60 config.p2p_peer = Peer::socket("0.0.0.0:0".parse().unwrap()); let (_, p2p_send, p2p_recv) = start_with_config_and_key(config, peer).await?;
62
63 tokio::spawn(p2p_listen(
64 server, room, out_send, in_recv, p2p_send, p2p_recv,
65 ));
66 Ok(out_recv)
67}
68
69enum WsResult<P: Param> {
70 Out(ChannelMessage<P>),
71 Stream(Message),
72}
73
74#[inline]
75fn build_request(params: Value, room: RoomId, peer: &PeerKey) -> Value {
76 let mut request = json!({
77 "jsonrpc": "2.0",
78 "id": 0,
79 "gid": room,
80 "peer": peer.peer_id().to_hex(),
81 });
82
83 merge_json(&mut request, ¶ms);
84 request
85}
86
87async fn ws_listen<P: Param>(
88 peer: PeerKey,
89 room: RoomId,
90 send: UnboundedSender<ChannelMessage<P>>,
91 mut in_recv: UnboundedReceiver<ChannelMessage<P>>,
92 ws_stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
93) {
94 let (mut writer, mut reader) = ws_stream.split();
95
96 let request = build_request(
98 json!({
99 "method": "connect",
100 "params": [],
101 }),
102 room,
103 &peer,
104 );
105 let s = Message::from(serde_json::to_string(&request).unwrap_or("".to_owned()));
106 let _ = writer.send(s).await;
107
108 loop {
109 let res = tokio::select! {
110 v = async { in_recv.recv().await.map(|msg| WsResult::Out(msg)) } => v,
111 v = async {
112 reader
113 .next()
114 .await
115 .map(|msg| msg.map(|msg| WsResult::Stream(msg)).ok())
116 .flatten()
117 } => v,
118 };
119
120 match res {
121 Some(WsResult::Out((room, params))) => {
122 let request = build_request(params.to_value(), room, &peer);
123 let s = Message::from(serde_json::to_string(&request).unwrap_or("".to_owned()));
124 let _ = writer.send(s).await;
125 }
126 Some(WsResult::Stream(msg)) => {
127 let msg = msg.to_text().unwrap_or("");
128 match serde_json::from_str::<Value>(&msg) {
129 Ok(mut values) => {
130 let gid = values["gid"].as_u64().unwrap_or(0);
131 let method = values["method"].as_str().unwrap_or("").to_owned();
132 let mut params = values["result"].take();
133 merge_json(
134 &mut params,
135 &json!({
136 "method": method
137 }),
138 );
139
140 match P::from_value(params) {
141 Ok(p) => {
142 let _ = send.send((gid, p));
143 }
144 _ => {}
145 }
146 }
147 Err(_e) => {}
148 }
149 }
150 None => break,
151 }
152 }
153}
154
155enum P2pResult<P: Param> {
156 Out(ChannelMessage<P>),
157 Stream(ReceiveMessage),
158}
159
160async fn p2p_listen<P: Param>(
161 server: Peer,
162 room: RoomId,
163 send: UnboundedSender<ChannelMessage<P>>,
164 mut in_recv: UnboundedReceiver<ChannelMessage<P>>,
165 p2p_send: Sender<SendMessage>,
166 mut p2p_recv: Receiver<ReceiveMessage>,
167) {
168 let server_id = server.id;
169 let _ = p2p_send
171 .send(SendMessage::Network(NetworkType::AddGroup(room)))
172 .await;
173 let _ = p2p_send
175 .send(SendMessage::Network(NetworkType::Connect(Peer::socket(
176 server.socket,
177 ))))
178 .await;
179 tokio::time::sleep(std::time::Duration::from_secs(2)).await;
181 let _ = p2p_send
182 .send(SendMessage::Group(
183 room,
184 SendType::Connect(0, Peer::peer(server.id), vec![]),
185 ))
186 .await;
187
188 loop {
189 let res = tokio::select! {
190 v = async { in_recv.recv().await.map(|msg| P2pResult::Out(msg)) } => v,
191 v = async {
192 p2p_recv
193 .recv()
194 .await
195 .map(|msg| P2pResult::Stream(msg))
196 } => v,
197 };
198
199 match res {
200 Some(P2pResult::Out((room, params))) => {
201 let _ = p2p_send
202 .send(SendMessage::Group(
203 room,
204 SendType::Event(0, server_id, params.to_bytes()),
205 ))
206 .await;
207 }
208 Some(P2pResult::Stream(message)) => match message {
209 ReceiveMessage::Group(gid, msg) => match msg {
210 RecvType::Event(peer, msg) => {
211 if peer == server_id {
212 match Param::from_bytes(msg) {
213 Ok(p) => {
214 let _ = send.send((gid, p));
215 }
216 _ => {}
217 }
218 }
219 }
220 _ => {}
221 },
222 _ => {}
223 },
224 None => break,
225 }
226 }
227}