tdn/
lib.rs

1//! `TDN` - Trusted Distributed Network.
2//!
3//! Blockchain infrastructure framework for security and trusted distributed interactive.
4//!
5//! TDN is underlying network (including p2p, rpc, and other special transports)
6//! and application framework built on Groups and Layers, we built this framework
7//! because we feel that the blockchain is very limited. If you want a more open
8//! and free distributed application development technology, and Pluggable,
9//! lightweight application framework, TDN can satisfy you.
10
11#![recursion_limit = "1024"]
12
13// check features conflict
14#[cfg(any(
15    all(
16        feature = "std",
17        any(feature = "single", feature = "multiple", feature = "full")
18    ),
19    all(
20        feature = "single",
21        any(feature = "std", feature = "multiple", feature = "full")
22    ),
23    all(
24        feature = "multiple",
25        any(feature = "std", feature = "single", feature = "full")
26    ),
27    all(
28        feature = "full",
29        any(feature = "std", feature = "single", feature = "multiple")
30    ),
31))]
32panic!("feature conflict, only one feature at one time.");
33
34#[macro_use]
35extern crate tracing;
36
37mod config;
38mod group;
39mod rpc;
40
41#[cfg(any(feature = "std", feature = "full"))]
42mod layer;
43
44// public mod
45pub mod error;
46
47// re-export tdn_types
48pub use tdn_types as types;
49
50// public struct
51pub mod prelude {
52    pub use super::config::Config;
53    pub use super::rpc::{
54        channel_rpc_channel, ChannelAddr, ChannelMessage, ChannelRpcSender, RpcConfig, RpcMessage,
55    };
56    pub use chamomile::prelude::Config as P2pConfig;
57    pub use tdn_types::{
58        group::{GroupId, GROUP_BYTES_LENGTH},
59        message::{
60            NetworkType, ReceiveMessage, RecvType, SendMessage, SendType, StateRequest,
61            StateResponse,
62        },
63        primitives::{Broadcast, HandleResult, Peer, PeerId, PeerKey, Result},
64    };
65
66    use chamomile::prelude::{
67        start as chamomile_start, start_with_key as chamomile_start_with_key,
68        ReceiveMessage as ChamomileReceiveMessage, SendMessage as ChamomileSendMessage,
69    };
70    use std::{path::PathBuf, sync::Arc};
71    use tdn_types::message::RpcSendType;
72    use tokio::{
73        sync::mpsc::{self, Receiver, Sender},
74        sync::RwLock,
75    };
76
77    use super::group::*;
78    use super::rpc::start as rpc_start;
79
80    #[cfg(any(feature = "std", feature = "full"))]
81    use super::layer::*;
82
83    /// new a channel, send message to TDN Message. default capacity is 1024.
84    pub fn new_send_channel() -> (Sender<SendMessage>, Receiver<SendMessage>) {
85        mpsc::channel(1024)
86    }
87
88    /// new a channel, send message to TDN Message. default capacity is 1024.
89    pub fn new_receive_channel() -> (Sender<ReceiveMessage>, Receiver<ReceiveMessage>) {
90        mpsc::channel(1024)
91    }
92
93    /// start a service, use config.toml file.
94    /// send a Sender<Message>, and return the peer_id, and service Sender<Message>.
95    pub async fn start() -> Result<(PeerId, Sender<SendMessage>, Receiver<ReceiveMessage>)> {
96        let (send_send, send_recv) = new_send_channel();
97        let (recv_send, recv_recv) = new_receive_channel();
98
99        let config = Config::load(PathBuf::from("./")).await;
100
101        let (_secret, ids, p2p_config, rpc_config) = config.split();
102        let rpc_send = start_rpc(rpc_config, recv_send.clone()).await?;
103        let peer_id =
104            start_main(ids, p2p_config, recv_send, send_recv, Some(rpc_send), None).await?;
105
106        Ok((peer_id, send_send, recv_recv))
107    }
108
109    /// start a service with config.
110    pub async fn start_with_config(
111        config: Config,
112    ) -> Result<(PeerId, Sender<SendMessage>, Receiver<ReceiveMessage>)> {
113        let (send_send, send_recv) = new_send_channel();
114        let (recv_send, recv_recv) = new_receive_channel();
115
116        let (_secret, ids, p2p_config, rpc_config) = config.split();
117        let rpc_send = start_rpc(rpc_config, recv_send.clone()).await?;
118        let peer_id =
119            start_main(ids, p2p_config, recv_send, send_recv, Some(rpc_send), None).await?;
120
121        Ok((peer_id, send_send, recv_recv))
122    }
123
124    /// start a service with config and PeerKey.
125    pub async fn start_with_config_and_key(
126        config: Config,
127        key: PeerKey,
128    ) -> Result<(PeerId, Sender<SendMessage>, Receiver<ReceiveMessage>)> {
129        let (send_send, send_recv) = new_send_channel();
130        let (recv_send, recv_recv) = new_receive_channel();
131
132        let (_secret, ids, p2p_config, rpc_config) = config.split();
133        let rpc_send = start_rpc(rpc_config, recv_send.clone()).await?;
134        let peer_id = start_main(
135            ids,
136            p2p_config,
137            recv_send,
138            send_recv,
139            Some(rpc_send),
140            Some(key),
141        )
142        .await?;
143
144        Ok((peer_id, send_send, recv_recv))
145    }
146
147    /// start a separate rpc service.
148    pub async fn start_rpc(
149        config: RpcConfig,
150        out_send: Sender<ReceiveMessage>,
151    ) -> Result<Sender<RpcSendType>> {
152        rpc_start(config, out_send).await
153    }
154
155    /// start a separate p2p service and unified tdn channel.
156    pub async fn start_main(
157        group_ids: Vec<GroupId>,
158        p2p_config: P2pConfig,
159        out_send: Sender<ReceiveMessage>,
160        mut self_recv: Receiver<SendMessage>,
161        rpc_send: Option<Sender<RpcSendType>>,
162        key: Option<PeerKey>,
163    ) -> Result<PeerId> {
164        // start chamomile network & inner rpc.
165        let res1 = if let Some(key) = key {
166            chamomile_start_with_key(p2p_config, key).await
167        } else {
168            chamomile_start(p2p_config).await
169        };
170
171        let (peer_id, p2p_send, mut p2p_recv) = res1?;
172
173        debug!("chamomile & jsonrpc service started");
174        let my_groups = Arc::new(RwLock::new(group_ids));
175        let my_groups_1 = my_groups.clone();
176
177        // handle chamomile send msg.
178        let listen_task = tokio::spawn(async move {
179            // if group's inner message, from_group in our groups.
180            // if layer's message,       from_group not in our groups.
181
182            while let Some(message) = p2p_recv.recv().await {
183                match message {
184                    ChamomileReceiveMessage::StableConnect(peer, mut data) => {
185                        if data.len() < GROUP_BYTES_LENGTH * 2 {
186                            continue;
187                        }
188                        let mut fgid_bytes = [0u8; GROUP_BYTES_LENGTH];
189                        let mut tgid_bytes = [0u8; GROUP_BYTES_LENGTH];
190                        fgid_bytes.copy_from_slice(data.drain(..GROUP_BYTES_LENGTH).as_slice());
191                        tgid_bytes.copy_from_slice(data.drain(..GROUP_BYTES_LENGTH).as_slice());
192                        let fgid = GroupId::from_be_bytes(fgid_bytes);
193                        let tgid = GroupId::from_be_bytes(tgid_bytes);
194
195                        let group_lock = my_groups.read().await;
196                        if group_lock.len() == 0 {
197                            continue;
198                        }
199
200                        if fgid == tgid && group_lock.contains(&fgid) {
201                            drop(group_lock);
202                            let _ = group_handle_connect(&fgid, &out_send, peer.into(), data).await;
203                        } else {
204                            drop(group_lock);
205                            // layer handle it.
206                            #[cfg(any(feature = "std", feature = "full"))]
207                            let _ = layer_handle_connect(fgid, tgid, &out_send, peer.into(), data)
208                                .await;
209                        }
210                    }
211                    ChamomileReceiveMessage::ResultConnect(peer, mut data) => {
212                        if data.len() < GROUP_BYTES_LENGTH * 2 {
213                            continue;
214                        }
215                        let mut fgid_bytes = [0u8; GROUP_BYTES_LENGTH];
216                        let mut tgid_bytes = [0u8; GROUP_BYTES_LENGTH];
217                        fgid_bytes.copy_from_slice(data.drain(..GROUP_BYTES_LENGTH).as_slice());
218                        tgid_bytes.copy_from_slice(data.drain(..GROUP_BYTES_LENGTH).as_slice());
219                        let fgid = GroupId::from_be_bytes(fgid_bytes);
220                        let tgid = GroupId::from_be_bytes(tgid_bytes);
221
222                        let group_lock = my_groups.read().await;
223                        if group_lock.len() == 0 {
224                            continue;
225                        }
226
227                        if fgid == tgid && group_lock.contains(&fgid) {
228                            drop(group_lock);
229                            let _ =
230                                group_handle_result_connect(&fgid, &out_send, peer.into(), data)
231                                    .await;
232                        } else {
233                            drop(group_lock);
234                            // layer handle it.
235                            #[cfg(any(feature = "std", feature = "full"))]
236                            let _ = layer_handle_result_connect(
237                                fgid,
238                                tgid,
239                                &out_send,
240                                peer.into(),
241                                data,
242                            )
243                            .await;
244                        }
245                    }
246                    ChamomileReceiveMessage::StableResult(peer, is_ok, mut data) => {
247                        if data.len() < GROUP_BYTES_LENGTH * 2 {
248                            continue;
249                        }
250                        let mut fgid_bytes = [0u8; GROUP_BYTES_LENGTH];
251                        let mut tgid_bytes = [0u8; GROUP_BYTES_LENGTH];
252                        fgid_bytes.copy_from_slice(data.drain(..GROUP_BYTES_LENGTH).as_slice());
253                        tgid_bytes.copy_from_slice(data.drain(..GROUP_BYTES_LENGTH).as_slice());
254                        let fgid = GroupId::from_be_bytes(fgid_bytes);
255                        let tgid = GroupId::from_be_bytes(tgid_bytes);
256
257                        let group_lock = my_groups.read().await;
258                        if group_lock.len() == 0 {
259                            continue;
260                        }
261
262                        if fgid == tgid && group_lock.contains(&fgid) {
263                            drop(group_lock);
264                            let _ = group_handle_result(&fgid, &out_send, peer.into(), is_ok, data)
265                                .await;
266                        } else {
267                            drop(group_lock);
268                            // layer handle it.
269                            #[cfg(any(feature = "std", feature = "full"))]
270                            let _ = layer_handle_result(
271                                fgid,
272                                tgid,
273                                &out_send,
274                                peer.into(),
275                                is_ok,
276                                data,
277                            )
278                            .await;
279                        }
280                    }
281                    ChamomileReceiveMessage::StableLeave(peer) => {
282                        let group_lock = my_groups.read().await;
283                        for gid in group_lock.iter() {
284                            let _ = group_handle_leave(&gid, &out_send, peer).await;
285                            #[cfg(any(feature = "std", feature = "full"))]
286                            let _ = layer_handle_leave(*gid, &out_send, peer).await;
287                        }
288                        drop(group_lock);
289                    }
290                    ChamomileReceiveMessage::Data(peer_id, mut data) => {
291                        if data.len() < GROUP_BYTES_LENGTH * 2 {
292                            continue;
293                        }
294                        let mut fgid_bytes = [0u8; GROUP_BYTES_LENGTH];
295                        let mut tgid_bytes = [0u8; GROUP_BYTES_LENGTH];
296                        fgid_bytes.copy_from_slice(data.drain(..GROUP_BYTES_LENGTH).as_slice());
297                        tgid_bytes.copy_from_slice(data.drain(..GROUP_BYTES_LENGTH).as_slice());
298                        let fgid = GroupId::from_be_bytes(fgid_bytes);
299                        let tgid = GroupId::from_be_bytes(tgid_bytes);
300
301                        let group_lock = my_groups.read().await;
302                        if group_lock.len() == 0 {
303                            continue;
304                        }
305
306                        if fgid == tgid && group_lock.contains(&fgid) {
307                            drop(group_lock);
308                            let _ = group_handle_data(&fgid, &out_send, peer_id, data).await;
309                        } else {
310                            drop(group_lock);
311                            // layer handle it.
312                            #[cfg(any(feature = "std", feature = "full"))]
313                            let _ = layer_handle_data(fgid, tgid, &out_send, peer_id, data).await;
314                        }
315                    }
316                    ChamomileReceiveMessage::Stream(id, stream, mut data) => {
317                        if data.len() < GROUP_BYTES_LENGTH * 2 {
318                            continue;
319                        }
320                        let mut fgid_bytes = [0u8; GROUP_BYTES_LENGTH];
321                        let mut tgid_bytes = [0u8; GROUP_BYTES_LENGTH];
322                        fgid_bytes.copy_from_slice(data.drain(..GROUP_BYTES_LENGTH).as_slice());
323                        tgid_bytes.copy_from_slice(data.drain(..GROUP_BYTES_LENGTH).as_slice());
324                        let fgid = GroupId::from_be_bytes(fgid_bytes);
325                        let tgid = GroupId::from_be_bytes(tgid_bytes);
326
327                        let group_lock = my_groups.read().await;
328                        if group_lock.len() == 0 {
329                            continue;
330                        }
331
332                        if fgid == tgid && group_lock.contains(&fgid) {
333                            drop(group_lock);
334                            let _ = group_handle_stream(&fgid, &out_send, id, stream, data).await;
335                        } else {
336                            drop(group_lock);
337                            // layer handle it.
338                            #[cfg(any(feature = "std", feature = "full"))]
339                            let _ =
340                                layer_handle_stream(fgid, tgid, &out_send, id, stream, data).await;
341                        }
342                    }
343                    ChamomileReceiveMessage::Delivery(t, tid, is_ok, mut data) => {
344                        if data.len() < GROUP_BYTES_LENGTH * 2 {
345                            continue;
346                        }
347                        let mut fgid_bytes = [0u8; GROUP_BYTES_LENGTH];
348                        let mut tgid_bytes = [0u8; GROUP_BYTES_LENGTH];
349                        fgid_bytes.copy_from_slice(data.drain(..GROUP_BYTES_LENGTH).as_slice());
350                        tgid_bytes.copy_from_slice(data.drain(..GROUP_BYTES_LENGTH).as_slice());
351                        let fgid = GroupId::from_be_bytes(fgid_bytes);
352                        let tgid = GroupId::from_be_bytes(tgid_bytes);
353
354                        let group_lock = my_groups.read().await;
355                        if group_lock.len() == 0 {
356                            continue;
357                        }
358
359                        if fgid == tgid && group_lock.contains(&fgid) {
360                            drop(group_lock);
361                            let _ =
362                                group_handle_delivery(&fgid, &out_send, t.into(), tid, is_ok).await;
363                        } else {
364                            drop(group_lock);
365                            // layer handle it.
366                            #[cfg(any(feature = "std", feature = "full"))]
367                            let _ = layer_handle_delivery(
368                                tgid, // Assuming it is remote sended.
369                                fgid,
370                                &out_send,
371                                t.into(),
372                                tid,
373                                is_ok,
374                            )
375                            .await;
376                        }
377                    }
378                    ChamomileReceiveMessage::NetworkLost => {
379                        out_send
380                            .send(ReceiveMessage::NetworkLost)
381                            .await
382                            .expect("Outside channel missing");
383                    }
384                    ChamomileReceiveMessage::OwnConnect(peer) => {
385                        let assist_id = peer.assist;
386                        let mut new_peer: Peer = peer.into();
387                        new_peer.id = assist_id;
388                        out_send
389                            .send(ReceiveMessage::Own(RecvType::Connect(new_peer, vec![])))
390                            .await
391                            .expect("Outside channel missing");
392                    }
393                    ChamomileReceiveMessage::OwnLeave(peer) => {
394                        let assist_id = peer.assist;
395                        let mut new_peer: Peer = peer.into();
396                        new_peer.id = assist_id;
397                        out_send
398                            .send(ReceiveMessage::Own(RecvType::Leave(new_peer)))
399                            .await
400                            .expect("Outside channel missing");
401                    }
402                    ChamomileReceiveMessage::OwnEvent(aid, data) => {
403                        out_send
404                            .send(ReceiveMessage::Own(RecvType::Event(aid, data)))
405                            .await
406                            .expect("Outside channel missing");
407                    }
408                }
409            }
410
411            warn!("Chamomile network is stopped");
412        });
413
414        // handle outside msg.
415        tokio::spawn(async move {
416            while let Some(message) = self_recv.recv().await {
417                match message {
418                    SendMessage::Own(msg) => match msg {
419                        SendType::Connect(tid, peer, data) => {
420                            p2p_send
421                                .send(ChamomileSendMessage::StableConnect(tid, peer.into(), data))
422                                .await
423                                .map_err(|e| error!("Chamomile channel: {:?}", e))
424                                .expect("Chamomile channel closed");
425                        }
426                        SendType::Event(_, _, data) => {
427                            p2p_send
428                                .send(ChamomileSendMessage::OwnEvent(data))
429                                .await
430                                .map_err(|e| error!("Chamomile channel: {:?}", e))
431                                .expect("Chamomile channel closed");
432                        }
433                        SendType::Stream(id, stream, data) => {
434                            p2p_send
435                                .send(ChamomileSendMessage::Stream(id, stream, data))
436                                .await
437                                .map_err(|e| error!("Chamomile channel: {:?}", e))
438                                .expect("Chamomile channel closed");
439                        }
440                        SendType::Disconnect(..) => {
441                            warn!("Own message has no DisConnect");
442                        }
443                        SendType::Result(..) => {
444                            warn!("Own message has no Result");
445                        }
446                    },
447                    #[cfg(any(feature = "single", feature = "std"))]
448                    SendMessage::Group(msg) => {
449                        let groups_lock = my_groups_1.read().await;
450                        if groups_lock.len() == 0 {
451                            drop(groups_lock);
452                            continue;
453                        }
454                        let default_group_id = groups_lock[0].clone();
455                        drop(groups_lock);
456
457                        group_handle_send(default_group_id, &p2p_send, msg)
458                            .await
459                            .map_err(|e| error!("Chamomile channel: {:?}", e))
460                            .expect("Chamomile channel closed");
461                    }
462                    #[cfg(any(feature = "multiple", feature = "full"))]
463                    SendMessage::Group(group_id, msg) => {
464                        group_handle_send(group_id, &p2p_send, msg)
465                            .await
466                            .map_err(|e| error!("Chamomile channel: {:?}", e))
467                            .expect("Chamomile channel closed");
468                    }
469                    SendMessage::Rpc(msg) => {
470                        if let Some(ref rpc_send) = rpc_send {
471                            rpc_send
472                                .send(msg)
473                                .await
474                                .map_err(|e| error!("Rpc channel: {:?}", e))
475                                .expect("Rpc channel closed");
476                        }
477                    }
478                    #[cfg(feature = "std")]
479                    SendMessage::Layer(tgid, msg) => {
480                        let groups_lock = my_groups_1.read().await;
481                        if groups_lock.len() == 0 {
482                            drop(groups_lock);
483                            continue;
484                        }
485                        let default_group_id = groups_lock[0].clone();
486                        drop(groups_lock);
487
488                        layer_handle_send(default_group_id, tgid, &p2p_send, msg)
489                            .await
490                            .map_err(|e| error!("Chamomile channel: {:?}", e))
491                            .expect("Chamomile channel closed");
492                    }
493                    #[cfg(feature = "full")]
494                    SendMessage::Layer(fgid, tgid, msg) => {
495                        layer_handle_send(fgid, tgid, &p2p_send, msg)
496                            .await
497                            .map_err(|e| error!("Chamomile channel: {:?}", e))
498                            .expect("Chamomile channel closed");
499                    }
500                    SendMessage::Network(nmsg) => match nmsg {
501                        NetworkType::Broadcast(broadcast, data) => {
502                            // broadcast use default_group_id.
503                            let mut bytes = vec![];
504                            let groups_lock = my_groups_1.read().await;
505                            if groups_lock.len() == 0 {
506                                drop(groups_lock);
507                                continue;
508                            }
509                            bytes.extend(&groups_lock[0].to_be_bytes());
510                            drop(groups_lock);
511                            bytes.extend(data);
512                            p2p_send
513                                .send(ChamomileSendMessage::Broadcast(broadcast, bytes))
514                                .await
515                                .map_err(|e| error!("Chamomile channel: {:?}", e))
516                                .expect("Chamomile channel closed");
517                        }
518                        NetworkType::Connect(peer) => {
519                            p2p_send
520                                .send(ChamomileSendMessage::Connect(peer.into()))
521                                .await
522                                .map_err(|e| error!("Chamomile channel: {:?}", e))
523                                .expect("Chamomile channel closed");
524                        }
525                        NetworkType::DisConnect(peer) => {
526                            p2p_send
527                                .send(ChamomileSendMessage::DisConnect(peer.into()))
528                                .await
529                                .map_err(|e| error!("Chamomile channel: {:?}", e))
530                                .expect("Chamomile channel closed");
531                        }
532                        NetworkType::NetworkState(req, sender) => {
533                            p2p_send
534                                .send(ChamomileSendMessage::NetworkState(req, sender))
535                                .await
536                                .map_err(|e| error!("Chamomile channel: {:?}", e))
537                                .expect("Chamomile channel closed");
538                        }
539                        NetworkType::NetworkReboot => {
540                            p2p_send
541                                .send(ChamomileSendMessage::NetworkReboot)
542                                .await
543                                .map_err(|e| error!("Chamomile channel: {:?}", e))
544                                .expect("Chamomile channel closed");
545                        }
546                        NetworkType::NetworkStop => {
547                            warn!("Start stop chamomile...");
548                            let _ = p2p_send.send(ChamomileSendMessage::NetworkStop).await;
549                            listen_task.abort();
550                            break;
551                        }
552                        #[cfg(any(feature = "multiple", feature = "full"))]
553                        NetworkType::AddGroup(gid) => {
554                            let mut group_lock = my_groups_1.write().await;
555                            if !group_lock.contains(&gid) {
556                                group_lock.push(gid);
557                            }
558                            drop(group_lock);
559                        }
560                        #[cfg(any(feature = "multiple", feature = "full"))]
561                        NetworkType::DelGroup(gid) => {
562                            let mut group_lock = my_groups_1.write().await;
563                            let mut need_remove: Vec<usize> = vec![];
564                            for (k, i) in group_lock.iter().enumerate() {
565                                if i == &gid {
566                                    need_remove.push(k);
567                                }
568                            }
569                            for i in need_remove.iter().rev() {
570                                group_lock.remove(*i);
571                            }
572                            drop(group_lock);
573                        }
574                    },
575                }
576            }
577        });
578
579        Ok(peer_id)
580    }
581}