1#![recursion_limit = "1024"]
12
13#[cfg(any(
15 all(
16 feature = "std",
17 any(feature = "single", feature = "multiple", feature = "full")
18 ),
19 all(
20 feature = "single",
21 any(feature = "std", feature = "multiple", feature = "full")
22 ),
23 all(
24 feature = "multiple",
25 any(feature = "std", feature = "single", feature = "full")
26 ),
27 all(
28 feature = "full",
29 any(feature = "std", feature = "single", feature = "multiple")
30 ),
31))]
32panic!("feature conflict, only one feature at one time.");
33
34#[macro_use]
35extern crate tracing;
36
37mod config;
38mod group;
39mod rpc;
40
41#[cfg(any(feature = "std", feature = "full"))]
42mod layer;
43
44pub mod error;
46
47pub use tdn_types as types;
49
50pub mod prelude {
52 pub use super::config::Config;
53 pub use super::rpc::{
54 channel_rpc_channel, ChannelAddr, ChannelMessage, ChannelRpcSender, RpcConfig, RpcMessage,
55 };
56 pub use chamomile::prelude::Config as P2pConfig;
57 pub use tdn_types::{
58 group::{GroupId, GROUP_BYTES_LENGTH},
59 message::{
60 NetworkType, ReceiveMessage, RecvType, SendMessage, SendType, StateRequest,
61 StateResponse,
62 },
63 primitives::{Broadcast, HandleResult, Peer, PeerId, PeerKey, Result},
64 };
65
66 use chamomile::prelude::{
67 start as chamomile_start, start_with_key as chamomile_start_with_key,
68 ReceiveMessage as ChamomileReceiveMessage, SendMessage as ChamomileSendMessage,
69 };
70 use std::{path::PathBuf, sync::Arc};
71 use tdn_types::message::RpcSendType;
72 use tokio::{
73 sync::mpsc::{self, Receiver, Sender},
74 sync::RwLock,
75 };
76
77 use super::group::*;
78 use super::rpc::start as rpc_start;
79
80 #[cfg(any(feature = "std", feature = "full"))]
81 use super::layer::*;
82
83 pub fn new_send_channel() -> (Sender<SendMessage>, Receiver<SendMessage>) {
85 mpsc::channel(1024)
86 }
87
88 pub fn new_receive_channel() -> (Sender<ReceiveMessage>, Receiver<ReceiveMessage>) {
90 mpsc::channel(1024)
91 }
92
93 pub async fn start() -> Result<(PeerId, Sender<SendMessage>, Receiver<ReceiveMessage>)> {
96 let (send_send, send_recv) = new_send_channel();
97 let (recv_send, recv_recv) = new_receive_channel();
98
99 let config = Config::load(PathBuf::from("./")).await;
100
101 let (_secret, ids, p2p_config, rpc_config) = config.split();
102 let rpc_send = start_rpc(rpc_config, recv_send.clone()).await?;
103 let peer_id =
104 start_main(ids, p2p_config, recv_send, send_recv, Some(rpc_send), None).await?;
105
106 Ok((peer_id, send_send, recv_recv))
107 }
108
109 pub async fn start_with_config(
111 config: Config,
112 ) -> Result<(PeerId, Sender<SendMessage>, Receiver<ReceiveMessage>)> {
113 let (send_send, send_recv) = new_send_channel();
114 let (recv_send, recv_recv) = new_receive_channel();
115
116 let (_secret, ids, p2p_config, rpc_config) = config.split();
117 let rpc_send = start_rpc(rpc_config, recv_send.clone()).await?;
118 let peer_id =
119 start_main(ids, p2p_config, recv_send, send_recv, Some(rpc_send), None).await?;
120
121 Ok((peer_id, send_send, recv_recv))
122 }
123
124 pub async fn start_with_config_and_key(
126 config: Config,
127 key: PeerKey,
128 ) -> Result<(PeerId, Sender<SendMessage>, Receiver<ReceiveMessage>)> {
129 let (send_send, send_recv) = new_send_channel();
130 let (recv_send, recv_recv) = new_receive_channel();
131
132 let (_secret, ids, p2p_config, rpc_config) = config.split();
133 let rpc_send = start_rpc(rpc_config, recv_send.clone()).await?;
134 let peer_id = start_main(
135 ids,
136 p2p_config,
137 recv_send,
138 send_recv,
139 Some(rpc_send),
140 Some(key),
141 )
142 .await?;
143
144 Ok((peer_id, send_send, recv_recv))
145 }
146
147 pub async fn start_rpc(
149 config: RpcConfig,
150 out_send: Sender<ReceiveMessage>,
151 ) -> Result<Sender<RpcSendType>> {
152 rpc_start(config, out_send).await
153 }
154
155 pub async fn start_main(
157 group_ids: Vec<GroupId>,
158 p2p_config: P2pConfig,
159 out_send: Sender<ReceiveMessage>,
160 mut self_recv: Receiver<SendMessage>,
161 rpc_send: Option<Sender<RpcSendType>>,
162 key: Option<PeerKey>,
163 ) -> Result<PeerId> {
164 let res1 = if let Some(key) = key {
166 chamomile_start_with_key(p2p_config, key).await
167 } else {
168 chamomile_start(p2p_config).await
169 };
170
171 let (peer_id, p2p_send, mut p2p_recv) = res1?;
172
173 debug!("chamomile & jsonrpc service started");
174 let my_groups = Arc::new(RwLock::new(group_ids));
175 let my_groups_1 = my_groups.clone();
176
177 let listen_task = tokio::spawn(async move {
179 while let Some(message) = p2p_recv.recv().await {
183 match message {
184 ChamomileReceiveMessage::StableConnect(peer, mut data) => {
185 if data.len() < GROUP_BYTES_LENGTH * 2 {
186 continue;
187 }
188 let mut fgid_bytes = [0u8; GROUP_BYTES_LENGTH];
189 let mut tgid_bytes = [0u8; GROUP_BYTES_LENGTH];
190 fgid_bytes.copy_from_slice(data.drain(..GROUP_BYTES_LENGTH).as_slice());
191 tgid_bytes.copy_from_slice(data.drain(..GROUP_BYTES_LENGTH).as_slice());
192 let fgid = GroupId::from_be_bytes(fgid_bytes);
193 let tgid = GroupId::from_be_bytes(tgid_bytes);
194
195 let group_lock = my_groups.read().await;
196 if group_lock.len() == 0 {
197 continue;
198 }
199
200 if fgid == tgid && group_lock.contains(&fgid) {
201 drop(group_lock);
202 let _ = group_handle_connect(&fgid, &out_send, peer.into(), data).await;
203 } else {
204 drop(group_lock);
205 #[cfg(any(feature = "std", feature = "full"))]
207 let _ = layer_handle_connect(fgid, tgid, &out_send, peer.into(), data)
208 .await;
209 }
210 }
211 ChamomileReceiveMessage::ResultConnect(peer, mut data) => {
212 if data.len() < GROUP_BYTES_LENGTH * 2 {
213 continue;
214 }
215 let mut fgid_bytes = [0u8; GROUP_BYTES_LENGTH];
216 let mut tgid_bytes = [0u8; GROUP_BYTES_LENGTH];
217 fgid_bytes.copy_from_slice(data.drain(..GROUP_BYTES_LENGTH).as_slice());
218 tgid_bytes.copy_from_slice(data.drain(..GROUP_BYTES_LENGTH).as_slice());
219 let fgid = GroupId::from_be_bytes(fgid_bytes);
220 let tgid = GroupId::from_be_bytes(tgid_bytes);
221
222 let group_lock = my_groups.read().await;
223 if group_lock.len() == 0 {
224 continue;
225 }
226
227 if fgid == tgid && group_lock.contains(&fgid) {
228 drop(group_lock);
229 let _ =
230 group_handle_result_connect(&fgid, &out_send, peer.into(), data)
231 .await;
232 } else {
233 drop(group_lock);
234 #[cfg(any(feature = "std", feature = "full"))]
236 let _ = layer_handle_result_connect(
237 fgid,
238 tgid,
239 &out_send,
240 peer.into(),
241 data,
242 )
243 .await;
244 }
245 }
246 ChamomileReceiveMessage::StableResult(peer, is_ok, mut data) => {
247 if data.len() < GROUP_BYTES_LENGTH * 2 {
248 continue;
249 }
250 let mut fgid_bytes = [0u8; GROUP_BYTES_LENGTH];
251 let mut tgid_bytes = [0u8; GROUP_BYTES_LENGTH];
252 fgid_bytes.copy_from_slice(data.drain(..GROUP_BYTES_LENGTH).as_slice());
253 tgid_bytes.copy_from_slice(data.drain(..GROUP_BYTES_LENGTH).as_slice());
254 let fgid = GroupId::from_be_bytes(fgid_bytes);
255 let tgid = GroupId::from_be_bytes(tgid_bytes);
256
257 let group_lock = my_groups.read().await;
258 if group_lock.len() == 0 {
259 continue;
260 }
261
262 if fgid == tgid && group_lock.contains(&fgid) {
263 drop(group_lock);
264 let _ = group_handle_result(&fgid, &out_send, peer.into(), is_ok, data)
265 .await;
266 } else {
267 drop(group_lock);
268 #[cfg(any(feature = "std", feature = "full"))]
270 let _ = layer_handle_result(
271 fgid,
272 tgid,
273 &out_send,
274 peer.into(),
275 is_ok,
276 data,
277 )
278 .await;
279 }
280 }
281 ChamomileReceiveMessage::StableLeave(peer) => {
282 let group_lock = my_groups.read().await;
283 for gid in group_lock.iter() {
284 let _ = group_handle_leave(&gid, &out_send, peer).await;
285 #[cfg(any(feature = "std", feature = "full"))]
286 let _ = layer_handle_leave(*gid, &out_send, peer).await;
287 }
288 drop(group_lock);
289 }
290 ChamomileReceiveMessage::Data(peer_id, mut data) => {
291 if data.len() < GROUP_BYTES_LENGTH * 2 {
292 continue;
293 }
294 let mut fgid_bytes = [0u8; GROUP_BYTES_LENGTH];
295 let mut tgid_bytes = [0u8; GROUP_BYTES_LENGTH];
296 fgid_bytes.copy_from_slice(data.drain(..GROUP_BYTES_LENGTH).as_slice());
297 tgid_bytes.copy_from_slice(data.drain(..GROUP_BYTES_LENGTH).as_slice());
298 let fgid = GroupId::from_be_bytes(fgid_bytes);
299 let tgid = GroupId::from_be_bytes(tgid_bytes);
300
301 let group_lock = my_groups.read().await;
302 if group_lock.len() == 0 {
303 continue;
304 }
305
306 if fgid == tgid && group_lock.contains(&fgid) {
307 drop(group_lock);
308 let _ = group_handle_data(&fgid, &out_send, peer_id, data).await;
309 } else {
310 drop(group_lock);
311 #[cfg(any(feature = "std", feature = "full"))]
313 let _ = layer_handle_data(fgid, tgid, &out_send, peer_id, data).await;
314 }
315 }
316 ChamomileReceiveMessage::Stream(id, stream, mut data) => {
317 if data.len() < GROUP_BYTES_LENGTH * 2 {
318 continue;
319 }
320 let mut fgid_bytes = [0u8; GROUP_BYTES_LENGTH];
321 let mut tgid_bytes = [0u8; GROUP_BYTES_LENGTH];
322 fgid_bytes.copy_from_slice(data.drain(..GROUP_BYTES_LENGTH).as_slice());
323 tgid_bytes.copy_from_slice(data.drain(..GROUP_BYTES_LENGTH).as_slice());
324 let fgid = GroupId::from_be_bytes(fgid_bytes);
325 let tgid = GroupId::from_be_bytes(tgid_bytes);
326
327 let group_lock = my_groups.read().await;
328 if group_lock.len() == 0 {
329 continue;
330 }
331
332 if fgid == tgid && group_lock.contains(&fgid) {
333 drop(group_lock);
334 let _ = group_handle_stream(&fgid, &out_send, id, stream, data).await;
335 } else {
336 drop(group_lock);
337 #[cfg(any(feature = "std", feature = "full"))]
339 let _ =
340 layer_handle_stream(fgid, tgid, &out_send, id, stream, data).await;
341 }
342 }
343 ChamomileReceiveMessage::Delivery(t, tid, is_ok, mut data) => {
344 if data.len() < GROUP_BYTES_LENGTH * 2 {
345 continue;
346 }
347 let mut fgid_bytes = [0u8; GROUP_BYTES_LENGTH];
348 let mut tgid_bytes = [0u8; GROUP_BYTES_LENGTH];
349 fgid_bytes.copy_from_slice(data.drain(..GROUP_BYTES_LENGTH).as_slice());
350 tgid_bytes.copy_from_slice(data.drain(..GROUP_BYTES_LENGTH).as_slice());
351 let fgid = GroupId::from_be_bytes(fgid_bytes);
352 let tgid = GroupId::from_be_bytes(tgid_bytes);
353
354 let group_lock = my_groups.read().await;
355 if group_lock.len() == 0 {
356 continue;
357 }
358
359 if fgid == tgid && group_lock.contains(&fgid) {
360 drop(group_lock);
361 let _ =
362 group_handle_delivery(&fgid, &out_send, t.into(), tid, is_ok).await;
363 } else {
364 drop(group_lock);
365 #[cfg(any(feature = "std", feature = "full"))]
367 let _ = layer_handle_delivery(
368 tgid, fgid,
370 &out_send,
371 t.into(),
372 tid,
373 is_ok,
374 )
375 .await;
376 }
377 }
378 ChamomileReceiveMessage::NetworkLost => {
379 out_send
380 .send(ReceiveMessage::NetworkLost)
381 .await
382 .expect("Outside channel missing");
383 }
384 ChamomileReceiveMessage::OwnConnect(peer) => {
385 let assist_id = peer.assist;
386 let mut new_peer: Peer = peer.into();
387 new_peer.id = assist_id;
388 out_send
389 .send(ReceiveMessage::Own(RecvType::Connect(new_peer, vec![])))
390 .await
391 .expect("Outside channel missing");
392 }
393 ChamomileReceiveMessage::OwnLeave(peer) => {
394 let assist_id = peer.assist;
395 let mut new_peer: Peer = peer.into();
396 new_peer.id = assist_id;
397 out_send
398 .send(ReceiveMessage::Own(RecvType::Leave(new_peer)))
399 .await
400 .expect("Outside channel missing");
401 }
402 ChamomileReceiveMessage::OwnEvent(aid, data) => {
403 out_send
404 .send(ReceiveMessage::Own(RecvType::Event(aid, data)))
405 .await
406 .expect("Outside channel missing");
407 }
408 }
409 }
410
411 warn!("Chamomile network is stopped");
412 });
413
414 tokio::spawn(async move {
416 while let Some(message) = self_recv.recv().await {
417 match message {
418 SendMessage::Own(msg) => match msg {
419 SendType::Connect(tid, peer, data) => {
420 p2p_send
421 .send(ChamomileSendMessage::StableConnect(tid, peer.into(), data))
422 .await
423 .map_err(|e| error!("Chamomile channel: {:?}", e))
424 .expect("Chamomile channel closed");
425 }
426 SendType::Event(_, _, data) => {
427 p2p_send
428 .send(ChamomileSendMessage::OwnEvent(data))
429 .await
430 .map_err(|e| error!("Chamomile channel: {:?}", e))
431 .expect("Chamomile channel closed");
432 }
433 SendType::Stream(id, stream, data) => {
434 p2p_send
435 .send(ChamomileSendMessage::Stream(id, stream, data))
436 .await
437 .map_err(|e| error!("Chamomile channel: {:?}", e))
438 .expect("Chamomile channel closed");
439 }
440 SendType::Disconnect(..) => {
441 warn!("Own message has no DisConnect");
442 }
443 SendType::Result(..) => {
444 warn!("Own message has no Result");
445 }
446 },
447 #[cfg(any(feature = "single", feature = "std"))]
448 SendMessage::Group(msg) => {
449 let groups_lock = my_groups_1.read().await;
450 if groups_lock.len() == 0 {
451 drop(groups_lock);
452 continue;
453 }
454 let default_group_id = groups_lock[0].clone();
455 drop(groups_lock);
456
457 group_handle_send(default_group_id, &p2p_send, msg)
458 .await
459 .map_err(|e| error!("Chamomile channel: {:?}", e))
460 .expect("Chamomile channel closed");
461 }
462 #[cfg(any(feature = "multiple", feature = "full"))]
463 SendMessage::Group(group_id, msg) => {
464 group_handle_send(group_id, &p2p_send, msg)
465 .await
466 .map_err(|e| error!("Chamomile channel: {:?}", e))
467 .expect("Chamomile channel closed");
468 }
469 SendMessage::Rpc(msg) => {
470 if let Some(ref rpc_send) = rpc_send {
471 rpc_send
472 .send(msg)
473 .await
474 .map_err(|e| error!("Rpc channel: {:?}", e))
475 .expect("Rpc channel closed");
476 }
477 }
478 #[cfg(feature = "std")]
479 SendMessage::Layer(tgid, msg) => {
480 let groups_lock = my_groups_1.read().await;
481 if groups_lock.len() == 0 {
482 drop(groups_lock);
483 continue;
484 }
485 let default_group_id = groups_lock[0].clone();
486 drop(groups_lock);
487
488 layer_handle_send(default_group_id, tgid, &p2p_send, msg)
489 .await
490 .map_err(|e| error!("Chamomile channel: {:?}", e))
491 .expect("Chamomile channel closed");
492 }
493 #[cfg(feature = "full")]
494 SendMessage::Layer(fgid, tgid, msg) => {
495 layer_handle_send(fgid, tgid, &p2p_send, msg)
496 .await
497 .map_err(|e| error!("Chamomile channel: {:?}", e))
498 .expect("Chamomile channel closed");
499 }
500 SendMessage::Network(nmsg) => match nmsg {
501 NetworkType::Broadcast(broadcast, data) => {
502 let mut bytes = vec![];
504 let groups_lock = my_groups_1.read().await;
505 if groups_lock.len() == 0 {
506 drop(groups_lock);
507 continue;
508 }
509 bytes.extend(&groups_lock[0].to_be_bytes());
510 drop(groups_lock);
511 bytes.extend(data);
512 p2p_send
513 .send(ChamomileSendMessage::Broadcast(broadcast, bytes))
514 .await
515 .map_err(|e| error!("Chamomile channel: {:?}", e))
516 .expect("Chamomile channel closed");
517 }
518 NetworkType::Connect(peer) => {
519 p2p_send
520 .send(ChamomileSendMessage::Connect(peer.into()))
521 .await
522 .map_err(|e| error!("Chamomile channel: {:?}", e))
523 .expect("Chamomile channel closed");
524 }
525 NetworkType::DisConnect(peer) => {
526 p2p_send
527 .send(ChamomileSendMessage::DisConnect(peer.into()))
528 .await
529 .map_err(|e| error!("Chamomile channel: {:?}", e))
530 .expect("Chamomile channel closed");
531 }
532 NetworkType::NetworkState(req, sender) => {
533 p2p_send
534 .send(ChamomileSendMessage::NetworkState(req, sender))
535 .await
536 .map_err(|e| error!("Chamomile channel: {:?}", e))
537 .expect("Chamomile channel closed");
538 }
539 NetworkType::NetworkReboot => {
540 p2p_send
541 .send(ChamomileSendMessage::NetworkReboot)
542 .await
543 .map_err(|e| error!("Chamomile channel: {:?}", e))
544 .expect("Chamomile channel closed");
545 }
546 NetworkType::NetworkStop => {
547 warn!("Start stop chamomile...");
548 let _ = p2p_send.send(ChamomileSendMessage::NetworkStop).await;
549 listen_task.abort();
550 break;
551 }
552 #[cfg(any(feature = "multiple", feature = "full"))]
553 NetworkType::AddGroup(gid) => {
554 let mut group_lock = my_groups_1.write().await;
555 if !group_lock.contains(&gid) {
556 group_lock.push(gid);
557 }
558 drop(group_lock);
559 }
560 #[cfg(any(feature = "multiple", feature = "full"))]
561 NetworkType::DelGroup(gid) => {
562 let mut group_lock = my_groups_1.write().await;
563 let mut need_remove: Vec<usize> = vec![];
564 for (k, i) in group_lock.iter().enumerate() {
565 if i == &gid {
566 need_remove.push(k);
567 }
568 }
569 for i in need_remove.iter().rev() {
570 group_lock.remove(*i);
571 }
572 drop(group_lock);
573 }
574 },
575 }
576 }
577 });
578
579 Ok(peer_id)
580 }
581}