1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
use std::{time::Duration};
use async_std::{future, io::{prelude::*, Cursor}, task};
use cyfs_base::*;
use cyfs_bdt::*;
mod utils;
async fn watch_recv_chunk(stack: StackGuard, chunkid: ChunkId) -> BuckyResult<ChunkId> {
loop {
let ret = stack.ndn().chunk_manager().store().get(&chunkid).await;
if let Ok(mut reader) = ret {
let mut content = vec![0u8; chunkid.len()];
let _ = reader.read(content.as_mut_slice()).await?;
let recv_id = ChunkId::calculate(content.as_slice()).await?;
return Ok(recv_id);
} else {
task::sleep(Duration::from_millis(500)).await;
}
}
}
fn watch_resource(task: Box<dyn DownloadTask>) {
task::spawn(async move {
loop {
log::info!("task state: {:?}", task.state());
task::sleep(Duration::from_millis(500)).await;
}
});
}
#[async_std::main]
async fn main() {
cyfs_util::process::check_cmd_and_exec("bdt-example-channel");
cyfs_debug::CyfsLoggerBuilder::new_app("bdt-example-channel")
.level("trace")
.console("debug")
.build()
.unwrap()
.start();
cyfs_debug::PanicBuilder::new("bdt-example-channel", "bdt-example-channel")
.exit_on_panic(true)
.build()
.start();
let (ln_dev, ln_secret) = utils::create_device("5aSixgLuJjfrNKn9D4z66TEM6oxL3uNmWCWHk52cJDKR", &["W4udp127.0.0.1:10000"]).unwrap();
let (rn_dev, rn_secret) = utils::create_device("5aSixgLuJjfrNKn9D4z66TEM6oxL3uNmWCWHk52cJDKR", &["W4udp127.0.0.1:10001"]).unwrap();
let mut ln_params = StackOpenParams::new("bdt-example-channel-download");
let ln_store = MemChunkStore::new();
ln_params.chunk_store = Some(ln_store.clone_as_reader());
ln_params.known_device = Some(vec![rn_dev.clone()]);
let ln_stack = Stack::open(
ln_dev.clone(),
ln_secret,
ln_params).await.unwrap();
let mut rn_params = StackOpenParams::new("bdt-example-channel-upload");
rn_params.config.interface.udp.sim_loss_rate = 10;
let rn_stack = Stack::open(
rn_dev,
rn_secret,
rn_params).await.unwrap();
for _ in 1..2 {
let (chunk_len, chunk_data) = utils::random_mem(1024, 100);
let chunk_hash = hash_data(&chunk_data[..]);
let chunkid = ChunkId::new(&chunk_hash, chunk_len as u32);
let dir = cyfs_util::get_named_data_root("bdt-example-channel-upload");
let path = dir.join(chunkid.to_string().as_str());
local_chunk_writer(&chunkid, path).write(Cursor::new(chunk_data)).await.unwrap();
let context = SampleDownloadContext::new("".to_owned());
context.add_source(DownloadSource {
target: rn_stack.local_const().clone(),
codec_desc: ChunkCodecDesc::reverse_stream(None, None),
});
let (_, reader) = download_chunk(
&*ln_stack,
chunkid.clone(),
None,
context,
).await.unwrap();
ln_store.write_chunk(&chunkid, reader).await.unwrap();
let recv = future::timeout(Duration::from_secs(50), watch_recv_chunk(ln_stack.clone(), chunkid.clone())).await.unwrap();
let recv_chunk_id = recv.unwrap();
assert_eq!(recv_chunk_id, chunkid);
}
task::sleep(Duration::from_secs(10000000000)).await;
}