1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
use async_std::{
future,
task,
};
use cyfs_base::*;
use cyfs_bdt::{
*,
ndn::{
channel::{*, protocol::v0::*}
},
};
mod utils;
#[async_std::main]
async fn main() {
cyfs_util::process::check_cmd_and_exec("bdt-example");
cyfs_debug::CyfsLoggerBuilder::new_app("bdt-example")
.level("trace")
.console("info")
.build()
.unwrap()
.start();
cyfs_debug::PanicBuilder::new("bdt-example", "bdt-example")
.exit_on_panic(true)
.build()
.start();
let (down_dev, down_secret) = utils::create_device(
"5aSixgLuJjfrNKn9D4z66TEM6oxL3uNmWCWHk52cJDKR",
&["W4udp127.0.0.1:10016"],
)
.unwrap();
let (src_dev, src_secret) = utils::create_device(
"5aSixgLuJjfrNKn9D4z66TEM6oxL3uNmWCWHk52cJDKR",
&["W4udp127.0.0.1:10017"],
)
.unwrap();
let (down_stack, down_store) = {
let mut params = StackOpenParams::new("bdt-example");
let store = MemChunkStore::new();
params.chunk_store = Some(store.clone_as_reader());
params.known_device = Some(vec![src_dev.clone()]);
(
Stack::open(down_dev.clone(), down_secret, params)
.await
.unwrap(),
store
)
};
let (chunk_len, chunk_data) = utils::random_mem(1024, 1024);
let chunk_hash = hash_data(&chunk_data[..]);
let chunkid = ChunkId::new(&chunk_hash, chunk_len as u32);
let src_stack = {
let mut params = StackOpenParams::new("bdt-example");
struct RespNotFound {
}
#[async_trait::async_trait]
impl NdnEventHandler for RespNotFound {
async fn on_newly_interest(
&self,
_stack: &Stack,
interest: &Interest,
from: &Channel
) -> BuckyResult<()> {
let resp = RespInterest {
session_id: interest.session_id.clone(),
chunk: interest.chunk.clone(),
err: BuckyErrorCode::NotFound,
redirect: None,
redirect_referer: None,
to: None,
};
from.resp_interest(resp);
Ok(())
}
fn on_unknown_piece_data(
&self,
_stack: &Stack,
_piece: &PieceData,
_from: &Channel
) -> BuckyResult<DownloadSession> {
unimplemented!()
}
}
params.ndn_event = Some(Box::new(RespNotFound {}));
Stack::open(src_dev, src_secret, params).await.unwrap()
};
let context = SingleSourceContext::from_desc("".to_owned(), src_stack.local_const().clone());
let (path, reader) = download_chunk(
&*down_stack,
chunkid.clone(),
None,
context.clone()
)
.await.unwrap();
log::info!("task path: {}", path);
task::spawn(async move {
let session = context.wait_session(future::pending::<BuckyError>()).await.unwrap();
if let DownloadSessionState::Canceled(err) = session.wait_finish().await {
assert_eq!(err.code(), BuckyErrorCode::NotFound);
} else {
unreachable!()
}
});
down_store.write_chunk(&chunkid, reader).await.unwrap_err();
}