1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
use async_std::{
    channel, future,
    io::prelude::{ReadExt, WriteExt},
    task,
};
use cyfs_base::*;
use futures::StreamExt;
use cyfs_bdt::{
    *, 
    sn::service::*
};
use std::{net::Shutdown, time::Duration};
mod utils;


async fn recv_large_stream(stack: StackGuard, sender: channel::Sender<Vec<u8>>) {
    let acceptor = stack.stream_manager().listen(0).unwrap();
    let mut incoming = acceptor.incoming();
    loop {
        let mut pre_stream = incoming.next().await.unwrap().unwrap();
        pre_stream.stream.confirm(vec![].as_ref()).await.unwrap();
        let mut buffer = vec![];
        let _ = pre_stream.stream.read_to_end(&mut buffer).await.unwrap();
        let _ = pre_stream.stream.shutdown(Shutdown::Both);
        sender.send(buffer).await.unwrap();
    }  
}

struct TestServer {

}


impl SnServiceContractServer for TestServer {
    fn check_receipt(
        &self,
        _client_device: &Device,
        _local_receipt: &SnServiceReceipt,
        _client_receipt: &Option<ReceiptWithSignature>,
        _last_request_time: &ReceiptRequestTime,
    ) -> IsAcceptClient {
        IsAcceptClient::Accept(false)
    }

    fn verify_auth(&self, _client_device_id: &DeviceId) -> IsAcceptClient {
        IsAcceptClient::Accept(false)
    }
}


#[async_std::main]
async fn main() {
    cyfs_util::process::check_cmd_and_exec("bdt-example-sn");
    cyfs_debug::CyfsLoggerBuilder::new_app("bdt-example-sn")
        .level("trace")
        .console("info")
        .build()
        .unwrap()
        .start();

    cyfs_debug::PanicBuilder::new("bdt-example-sn", "bdt-example-sn")
        .exit_on_panic(true)
        .build()
        .start();

        let (sn1, sn1_secret) = utils::create_device("5aSixgLuJjfrNKn9D4z66TEM6oxL3uNmWCWHk52cJDKR", &["W4udp127.0.0.1:10020"]).unwrap();

        let (sn2, sn2_secret) = utils::create_device("5aSixgLuJjfrNKn9D4z66TEM6oxL3uNmWCWHk52cJDKR", &["W4udp127.0.0.1:10021"]).unwrap();
    
    
        let (ln_dev, ln_secret) = utils::create_device("5aSixgLuJjfrNKn9D4z66TEM6oxL3uNmWCWHk52cJDKR", &["L4udp127.0.0.1:10022"]).unwrap();
        let (rn_dev, rn_secret) = utils::create_device("5aSixgLuJjfrNKn9D4z66TEM6oxL3uNmWCWHk52cJDKR", &["L4udp127.0.0.1:10023"]).unwrap();
    
        let (sn_dev, sn_secret) = {
            let rn_id = rn_dev.desc().object_id();
            let sn1_id = sn1.desc().object_id();
            let sn2_id = sn1.desc().object_id();
    
            if rn_id.distance_of(&sn1_id) > rn_id.distance_of(&sn2_id) {
                (sn1.clone(), sn1_secret)
            } else {
                (sn2.clone(), sn2_secret)
            }
        };
    
    
        let service = SnService::new(
            sn_dev,
            sn_secret,
            Box::new(TestServer {}),
        );
    
        task::spawn(async move {
            let _ = service.start().await;
        });
    
    
        let mut ln_params = StackOpenParams::new("");
        ln_params.known_device = Some(vec![rn_dev.clone(), sn1.clone(), sn2.clone()]);
        let ln_stack = Stack::open(
            ln_dev.clone(), 
            ln_secret, 
            ln_params).await.unwrap();
    
    
        let mut rn_params = StackOpenParams::new("");
        rn_params.config.sn_client.ping.udp.resend_timeout = Duration::from_secs(1);
        rn_params.known_sn = Some(vec![sn1.clone(), sn2.clone()]);
        let rn_stack = Stack::open(
            rn_dev, 
            rn_secret, 
            rn_params).await.unwrap();
    
        assert_eq!(SnStatus::Online, rn_stack.sn_client().ping().wait_online().await.unwrap());
    
        let (sample_size, sample) = utils::random_mem(1024, 512);
        let (signal_sender, signal_recver) = channel::bounded::<Vec<u8>>(1);
        {
            let rn_stack = rn_stack.clone();
            task::spawn(async move {
                recv_large_stream(rn_stack, signal_sender).await;
            });
        }
    
        let param = BuildTunnelParams {
            remote_const: rn_stack.local_const().clone(),
            remote_sn: Some(vec![sn1.desc().device_id(), sn2.desc().device_id()]),
            remote_desc: None,
        };
        let mut stream = ln_stack
            .stream_manager()
            .connect(0u16, vec![], param)
            .await.unwrap();
        stream.write_all(&sample[..]).await.unwrap();
    
        let _ = stream.shutdown(Shutdown::Both);
    
        let recv_sample = future::timeout(Duration::from_secs(5), signal_recver.recv()).await.unwrap().unwrap();
    
        assert_eq!(recv_sample.len(), sample_size);
        let sample_hash = hash_data(sample.as_ref());
        let recv_hash = hash_data(recv_sample.as_ref());
    
        assert_eq!(sample_hash, recv_hash);

}


// #[async_std::main]
// async fn main() {
// }