1use std::{
2 net::{IpAddr, Ipv4Addr, SocketAddr, Shutdown},
3 path::Path,
4 str::FromStr,
5 time::{Duration, Instant}
6};
7use async_std::{
8 sync::Arc,
9 task,
10 net::{TcpListener, TcpStream},
11 stream::StreamExt,
12 future,
13 io::prelude::*,
14 };
16
17use cyfs_base::*;
18use crate::{
19 stack::{WeakStack, Stack},
20 tunnel::{BuildTunnelParams},
21 datagram::{self, DatagramOptions},
22 types::*,
23 ndn::*,
24 utils::*,
25};
26use super::command::*;
27use super::super::sn::client::SnStatus;
28
29struct DebugStubImpl {
30 stack: WeakStack,
31 listener: TcpListener,
32 chunk_store: MemChunkStore,
33}
34
35#[derive(Clone)]
36pub struct Config {
37 pub local: String,
38 pub port: u16,
39 pub chunk_store: MemChunkStore,
40}
41
42#[derive(Clone)]
43pub struct DebugStub(Arc<DebugStubImpl>);
44
45impl DebugStub {
46 pub async fn open(weak_stack: WeakStack, chunk_store: MemChunkStore) -> BuckyResult<Self> {
47 let stack = Stack::from(&weak_stack);
48 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::from_str(stack.config().debug.as_ref().unwrap().local.as_str()).unwrap()),
49 stack.config().debug.as_ref().unwrap().port);
50 let listener = TcpListener::bind(addr).await?;
51 Ok(Self(Arc::new(DebugStubImpl {
52 stack: weak_stack,
53 listener,
54 chunk_store,
55 })))
56 }
57
58 pub fn listen(&self) {
59 const READ_CMD_TIMEOUT: u64 = 30;
60
61 let stub = self.clone();
62 task::spawn(async move {
63 let mut incoming = stub.0.listener.incoming();
64 loop {
65 if let Some(stream) = incoming.next().await {
66 if let Ok(stream) = stream {
67 let stub = stub.clone();
68 task::spawn(async move {
69 let mut stream = stream;
70 let mut command = String::new();
71 match future::timeout(Duration::from_secs(READ_CMD_TIMEOUT), stream.read_to_string(&mut command)).await {
72 Err(_) => {
73 error!("read cmd timeout!");
74 },
75 Ok(_) => {
76 stub.handle_command(command, stream).await;
77 }
78 }
79 });
80 }
81 }
82 }
83 });
84 }
85
86 async fn handle_command(&self, command: String, tunnel: TcpStream) {
87 println!("command:{}", command);
88 let stack = Stack::from(&self.0.stack);
89 let mut tunnel = tunnel;
90 match DebugCommand::from_str(&stack, &command).await {
91 Ok(command) => {
92 if let Err(err) = match command {
93 DebugCommand::Test(_) => self.test(tunnel.clone()).await,
94 DebugCommand::Ping(command) => self.ping(tunnel.clone(), command).await,
95 DebugCommand::Nc(command) => self.nc(tunnel.clone(), command).await,
96 DebugCommand::GetChunk(command) => self.get_chunk(tunnel.clone(), command).await,
97 DebugCommand::GetFile(command) => self.get_file(tunnel.clone(), command).await,
98 DebugCommand::PutChunk(command) => self.put_chunk(tunnel.clone(), command).await,
99 DebugCommand::PutFile(command) => self.put_file(tunnel.clone(), command).await,
100 DebugCommand::SnConnStatus(command) => self.sn_conn_status(tunnel.clone(), command).await,
101 DebugCommand::BenchDatagram(command) => self.bench_datagram(tunnel.clone(), command).await,
102 } {
103 let _ = tunnel.write_all(err.as_ref()).await;
104 }
105 },
106 Err(err) => {
107 let _ = tunnel.write_all(err.as_ref()).await;
108 }
109 }
110 }
111
112 async fn test(&self, tunnel: TcpStream) -> Result<(), String> {
113 let mut tunnel = tunnel;
114
115 let _ = tunnel.write_all(format!("hello\r\n").as_bytes()).await;
116 let _ = tunnel.write_all(format!("bdt\r\n").as_bytes()).await;
117
118 Ok(())
119 }
120
121 async fn sn_conn_status(&self, tunnel: TcpStream, command: DebugCommandSnConnStatus) -> Result<(), String> {
122 let mut tunnel = tunnel;
123
124 let stack = Stack::from(&self.0.stack);
125 let timeout = {
126 if command.timeout_sec == 0 {
127 6
128 } else {
129 command.timeout_sec
130 }
131 };
132
133 let sleep_ms = 200;
134 let mut counter = timeout*(1000/sleep_ms);
135 loop {
136 let sn_status = stack.sn_client().ping().status();
137
138 if let Some(sn_status) = sn_status {
139 if let SnStatus::Online = sn_status {
140 let _ = tunnel.write_all("Ok: sn connected\r\n".as_ref()).await;
141
142 return Ok(())
143 }
144 }
145
146 counter -= 1;
147 if counter == 0 {
148 break ;
149 }
150
151 task::sleep(Duration::from_millis(sleep_ms)).await;
152 }
153
154 let _ = tunnel.write_all("Err: sn connect timeout\r\n".as_ref()).await;
155
156 Ok(())
157 }
158
159 async fn ping(&self, tunnel: TcpStream, command: DebugCommandPing) -> Result<(), String> {
160 let mut tunnel = tunnel;
161 let stack = Stack::from(&self.0.stack);
162 let datagram = stack.datagram_manager().bind(0)
163 .map_err(|err| format!("deamon bind datagram tunnel failed for {}\r\n", err))?;
164 for _ in 0..command.count {
165 let mut options = DatagramOptions::default();
166 let _ = tunnel.write_all("send ping.\r\n".as_ref()).await;
167
168 let ts = cyfs_base::bucky_time_now();
169 options.sequence = Some(TempSeq::from(ts as u32));
170 let _ = datagram.send_to(
171 "debug".as_ref(),
172 &mut options,
173 &command.remote.desc().device_id(),
174 datagram::ReservedVPort::Debug.into());
175 match future::timeout(command.timeout, datagram.recv_v()).await {
176 Err(_err) => {
177 let _ = tunnel.write_all("timeout\r\n".as_ref()).await;
178 },
179 Ok(res) => {
180 let datagrams = res.unwrap();
181 for datagram in datagrams {
182 if let Some(opt) = datagram.options.sequence {
183 if opt == options.sequence.unwrap() {
184 let s = format!("respose. time: {:.1} ms\r\n",
185 (cyfs_base::bucky_time_now() - ts) as f64 / 1000.0);
186 let _ = tunnel.write_all(s.as_bytes()).await;
187 break ;
188 }
189 }
190 }
191 }
192 }
193 }
194
195 Ok(())
196 }
197
198 async fn bench_datagram(&self, tunnel: TcpStream, command: DebugCommandBenchDatagram) -> Result<(), String> {
199 let mut tunnel = tunnel;
200
201 let from = 1;
202 let to = 65535;
203 let plaintext = command.plaintext;
204
205 let s = format!("bench_datagram: plaintext:{} timeout:{:?} from:{} to:{}\r\n",
206 plaintext, command.timeout, from, to);
207 let _ = tunnel.write_all(s.as_bytes()).await;
208
209 let mut n_ok = 0;
210 let stack = Stack::from(&self.0.stack);
211 let datagram = stack.datagram_manager().bind(0)
212 .map_err(|err| format!("deamon bind datagram tunnel failed for {}\r\n", err))?;
213 for i in from..to {
214 let mut options = DatagramOptions::default();
215 let _ = tunnel.write_all("send data.\r\n".as_ref()).await;
216
217 let data = rand_data_gen(i);
218 let ts = cyfs_base::bucky_time_now();
219 options.sequence = Some(TempSeq::from(ts as u32));
220 if i%2 == 0 {
221 options.create_time = Some(ts+10);
222 }
223 if i%4 == 0 {
224 options.send_time = Some(ts+20);
225 }
226 if i%8 == 0 {
227 options.author_id = Some(command.remote.desc().device_id().clone());
228 }
229 options.plaintext = plaintext;
230 let _ = datagram.send_to(
231 &data,
232 &mut options,
233 &command.remote.desc().device_id(),
234 datagram::ReservedVPort::Debug.into());
235 match future::timeout(command.timeout, datagram.recv_v()).await {
236 Err(_err) => {
237 let _ = tunnel.write_all("timeout\r\n".as_ref()).await;
238 },
239 Ok(res) => {
240 let datagrams = res.unwrap();
241 for datagram in datagrams {
242 if let Some(opt) = datagram.options.sequence {
243 if opt == options.sequence.unwrap() {
244 let md5_recv = md5::compute(&datagram.data);
245 let md5_send = md5::compute(&data);
246 if md5_recv == md5_send {
247 n_ok += 1;
248 }
249 let s = format!("respose: plaintext: {} time: {:.1} ms, success {}/{}, fail {}\r\n",
250 options.plaintext,
251 (cyfs_base::bucky_time_now() - ts) as f64 / 1000.0,
252 n_ok,
253 i,
254 i-n_ok);
255 let _ = tunnel.write_all(s.as_bytes()).await;
256 break ;
257 }
258 }
259 }
260 }
261 }
262 }
263
264 Ok(())
265 }
266
267 async fn nc(&self, tunnel: TcpStream, command: DebugCommandNc) -> Result<(), String> {
268 let stack = Stack::from(&self.0.stack);
269 let task_num = if command.task_num == 0 {
270 1
271 } else {
272 command.task_num
273 };
274 let mut tasks = vec![];
275
276 for task_id in 0..task_num {
277 let mut t = tunnel.clone();
278 let c = command.clone();
279 let s = stack.clone();
280 tasks.push(task::spawn(async move {
281 match nc_task(t.clone(), c, s, task_id).await {
282 Err(e) => {
283 let _ = t.write_all(format!("nc_task err={}\r\n", e).as_ref()).await;
284 },
285 Ok(_) => {
286 }
287 }
288 }));
289 }
290
291 for t in tasks {
292 let _ = t.await;
293 }
294
295 Ok(())
296 }
297
298 async fn get_chunk(&self, tunnel: TcpStream, command: DebugCommandGetChunk) -> Result<(), String> {
299 let mut tunnel = tunnel;
300
301 let chunk_id = command.chunk_id;
302 let remotes = command.remotes;
303 let stack = Stack::from(&self.0.stack);
306
307 let chunk_store = self.0.chunk_store.clone();
308 let context = SampleDownloadContext::desc_streams("".to_string(), remotes);
309 let begin = Instant::now();
310 match download_chunk(&stack, chunk_id.clone(),None, context).await {
311 Ok((_, reader)) => {
312 chunk_store.write_chunk(&chunk_id, reader).await.unwrap();
313 match future::timeout(Duration::from_secs(600), get_chunk_wait_finish(stack.clone(), chunk_id.clone())).await {
314 Err(e) => {
315 let _ = tunnel.write_all(format!("get_chunk_wait_finish err={}\r\n", e).as_ref()).await;
316 },
317 Ok(r) => {
318 match r {
319 Ok(n) => {
320 let cost_secs = begin.elapsed().as_secs_f64();
321 let _ = tunnel.write_all(format!("get success\r\n").as_ref()).await;
322 if chunk_id.len() != n {
323 let _ = tunnel.write_all(format!("data wrong, recv_len={} want={}\r\n", n, chunk_id.len()).as_ref()).await;
324 } else {
325 let len = n as f64;
326 let speed = if cost_secs > 0.0 {
327 len / cost_secs / 1024.0
328 } else {
329 999999.9
330 };
331 let _ = tunnel.write_all(format!("cost={:.3}s len={:.1}KB speed={:.1}KB/s\r\n",
332 cost_secs, len/1024.0, speed).as_ref()).await;
333 }
334 },
335 Err(e) => {
336 let _ = tunnel.write_all(format!("get_chunk_wait_finish err={}\r\n", e).as_ref()).await;
337 }
338 }
339 }
340 }
341 },
342 Err(e) => {
343 let _ = tunnel.write_all(format!("download_chunk err={}\r\n", e).as_ref()).await;
344 }
345 }
346
347 Ok(())
348 }
349
350 async fn get_file(&self, tunnel: TcpStream, command: DebugCommandGetFile) -> Result<(), String> {
351 let mut tunnel = tunnel;
352
353 let file_id = command.file_id;
354 let remotes = command.remotes;
355 let timeout = command.timeout;
356 let local_path = command.local_path;
357
358
359 let _ = tunnel.write_all("start downloading file..\r\n".as_ref()).await;
360
361 let stack = Stack::from(&self.0.stack);
362 let context = SampleDownloadContext::id_streams(&stack, "".to_owned(), &remotes).await
363 .map_err(|e| format!("download err: {}\r\n", e))?;
364 let (_, reader) = download_file(
365 &stack,
366 file_id.clone(),
367 None,
368 context)
369 .await.map_err(|e| {
370 format!("download err: {}\r\n", e)
371 })?;
372
373 let _ = future::timeout(Duration::from_secs(timeout as u64), tunnel.write_all("waitting..\r\n".as_ref())).await;
374
375 LocalChunkListWriter::from_file(local_path, &file_id)
376 .map_err(|e| {
377 format!("download err: {}\r\n", e)
378 })?
379 .write(reader).await
380 .map_err(|e| {
381 format!("download err: {}\r\n", e)
382 })?;
383
384 let _ = tunnel.write_all("download file finish.\r\n".as_ref()).await;
385 Ok(())
386 }
387
388 async fn put_chunk(&self, tunnel: TcpStream, command: DebugCommandPutChunk) -> Result<(), String> {
389 let mut tunnel = tunnel;
390 let local_path = command.local_path;
391
392 if local_path.as_path().exists() {
393 let mut file = async_std::fs::File::open(local_path.as_path()).await.map_err(|e| {
394 format!("open file err: {}\r\n", e)
395 })?;
396 let mut content = Vec::<u8>::new();
397 let _ = file.read_to_end(&mut content).await.map_err(|e| {
398 format!("read file err: {}\r\n", e)
399 })?;
400
401 if content.len() == 0 {
402 return Err(format!("file size is zero\r\n"));
403 }
404
405 let chunk_store = self.0.chunk_store.clone();
406 match ChunkId::calculate(content.as_slice()).await {
407 Ok(chunk_id) => {
408 match chunk_store.add(chunk_id.clone(), Arc::new(content)).await {
409 Ok(_) => {
410 let _ = tunnel.write_all(format!("put chunk success, chunk_id={}\r\n", chunk_id).as_ref()).await;
411 },
412 Err(e) => {
413 let _ = tunnel.write_all(format!("put chunk fail, err={}\r\n", e).as_ref()).await;
414 }
415 }
416 Ok(())
417 },
418 Err(e) => {
419 Err(format!("calculate chunk id err: {}\r\n", e))
420 }
421 }
422 } else {
423 Err(format!("file not exists: {}\r\n", local_path.to_str().unwrap()))
424 }
425 }
426
427 async fn put_file(&self, _tunnel: TcpStream, _command: DebugCommandPutFile) -> Result<(), String> {
428 Err("not supported now".to_owned())
492 }
493}
494
495async fn watchdog_download_finished(task: Box<dyn DownloadTask>, timeout: u32) -> Result<(), String> {
496 let mut _timeout = 1800; let mut i = 0;
498
499 loop {
500 match task.state() {
501 NdnTaskState::Finished => {
502 break Ok(());
503 },
504 NdnTaskState::Running => {
505 if task.cur_speed() > 0 {
506 i = 0;
507
508 if _timeout == 1800 { _timeout = timeout;
510 }
511 } else {
512 i += 1;
513 }
514 },
515 NdnTaskState::Error(e) => {
516 break Err(format!("download err, code: {:?}\r\n", e));
517 },
518 _ => {
519
520 }
521 }
522
523 if i >= _timeout {
524 break Err(format!("download timeout\r\n"));
525 }
526
527 task::sleep(Duration::from_secs(1)).await;
528 }
529}
530
531fn get_filesize(path: &Path) -> u64 {
532 let res = std::fs::File::open(path);
533 if res.is_ok() {
534 let file = res.unwrap();
535 return file.metadata().unwrap().len();
536 }
537
538 0
539}
540
541fn rand_data_gen(len: usize) -> Vec<u8> {
542 let mut buf = Vec::new();
543 buf.resize(len, 0u8);
544
545 let mut r = 0;
546 for i in 0..len {
547 if i%10 == 0 {
548 r = rand::random::<u8>();
549 }
550 buf[i] = r;
551 }
552
553 buf
554}
555
556fn rand_data_gen_buf(len: usize) -> Vec<u8> {
557 let mut buf = Vec::new();
558 buf.resize(len + 8, 0u8);
559
560 buf[0..8].copy_from_slice(&len.to_be_bytes());
561
562 let mut r = 0;
563 for i in 8..len {
564 if i%10 == 0 {
565 r = rand::random::<u8>();
566 }
567 buf[i] = r;
568 }
569
570 buf
571}
572
573fn rand_char(len: usize) -> Vec<u8> {
574 let mut buf = Vec::new();
575 buf.resize(len, 0u8);
576
577 for i in 0..len {
578 buf[i] = 97 + rand::random::<u8>() % 26;
579 }
580
581 buf
582}
583
584async fn get_chunk_wait_finish(stack: Stack, chunk_id: ChunkId) -> BuckyResult<usize> {
585 let mut len = 0;
586 loop {
587 let ret = stack.ndn().chunk_manager().store().get(&chunk_id).await;
588 if let Ok(mut reader) = ret {
589 let mut content = vec![0u8; 2048];
590
591 loop {
592 let n = reader.read(content.as_mut_slice()).await?;
593 if n == 0 {
594 break ;
595 }
596 len += n;
597 }
598
599 return Ok(len);
600 } else {
601 task::sleep(Duration::from_millis(200)).await;
602 }
603 }
604}
605
606async fn nc_task(tunnel: TcpStream, command: DebugCommandNc, stack: Stack, task_id: u32) -> Result<(), String> {
607 let mut tunnel = tunnel;
608 let _ = tunnel.write_all(format!("[{}] connecting stream\r\n", task_id).as_ref()).await;
609
610 let question = b"question?";
611 let mut conn = stack.stream_manager().connect(
612 command.port,
613 question.to_vec(),
614 BuildTunnelParams {
615 remote_const: command.remote.desc().clone(),
616 remote_sn: None,
617 remote_desc: Some(command.remote.clone())
618 }).await.map_err(|err| format!("Err: {}\r\n", err.msg().to_string()))?;
619
620 let _ = tunnel.write_all(format!("[{}] Connect success, read answer\r\n", task_id).as_ref()).await;
621
622 let mut answer = [0; 128];
623 match conn.read(&mut answer).await {
624 Ok(len) => {
625 let s = format!("[{}] Read answer success, len={} content={:?}\r\n",
626 task_id, len, String::from_utf8(answer[..len].to_vec()).expect(""));
627 let _ = tunnel.write_all(s.as_bytes()).await;
628 },
629 Err(e) => {
630 let s = format!("[{}] Read answer fail, err={}\r\n", task_id, e);
631 let _ = tunnel.write_all(s.as_bytes()).await;
632 return Ok(());
633 }
634 }
635
636 let _ = conn.write_all(b"hello world").await;
637
638 let mut buf = [0u8; 128];
639 match conn.read(&mut buf).await {
640 Ok(len) => {
641 let s = format!("[{}] Read data success, len={} content={:?}\r\n",
642 task_id, len, String::from_utf8(buf[..len].to_vec()).expect(""));
643 let _ = tunnel.write_all(s.as_bytes()).await;
644 },
645 Err(e) => {
646 let s = format!("[{}] Read data fail, err={}\r\n", task_id, e);
647 let _ = tunnel.write_all(s.as_bytes()).await;
648 return Ok(());
649 }
650 }
651
652 let _ = tunnel.write_all(format!("[{}] Ok: stream connected\r\n", task_id).as_ref()).await;
653
654 if command.bench > 0 {
655 let _ = tunnel.write_all(format!("[{}] start bench size={}MB\r\n", task_id, command.bench).as_ref()).await;
656
657 let buf = rand_char(1024);
658 let mut i: u32 = 0;
659 let max = command.bench * 1024;
660 let begin = Instant::now();
661 loop {
662 match conn.write_all(&buf).await {
663 Ok(_) => {
664 i += 1;
665 },
666 Err(e) => {
667 let _ = tunnel.write_all(format!("[{}] write err={}\r\n", task_id, e).as_ref()).await;
668 break;
669 }
670 }
671 if i >= max {
672 break;
673 }
674 }
675 let cost = begin.elapsed().as_secs_f64();
676 let speed = if cost > 0.0 {
677 i as f64 / cost
678 } else {
679 999999.9
680 };
681 let _ = tunnel.write_all(format!("[{}] bench over. cost={:.3}s len={}KB speed={:.1}KB/s\r\n",
682 task_id, cost, i, speed).as_ref()).await;
683 }
684
685 let _ = conn.shutdown(Shutdown::Both);
686
687 Ok(())
688}