cyfs_bdt/debug/
stub.rs

1use std::{
2    net::{IpAddr, Ipv4Addr, SocketAddr, Shutdown},
3    path::Path, 
4    str::FromStr, 
5    time::{Duration, Instant}
6};
7use async_std::{
8    sync::Arc, 
9    task, 
10    net::{TcpListener, TcpStream},
11    stream::StreamExt,
12    future, 
13    io::prelude::*,
14    // fs::File, 
15};
16
17use cyfs_base::*;
18use crate::{
19    stack::{WeakStack, Stack}, 
20    tunnel::{BuildTunnelParams}, 
21    datagram::{self, DatagramOptions}, 
22    types::*,
23    ndn::*, 
24    utils::*,
25};
26use super::command::*;
27use super::super::sn::client::SnStatus;
28
29struct DebugStubImpl {
30    stack: WeakStack, 
31    listener: TcpListener,
32    chunk_store: MemChunkStore, 
33}
34
35#[derive(Clone)]
36pub struct Config {
37    pub local: String,
38    pub port: u16,
39    pub chunk_store: MemChunkStore,
40}
41
42#[derive(Clone)]
43pub struct DebugStub(Arc<DebugStubImpl>);
44
45impl DebugStub {
46    pub async fn open(weak_stack: WeakStack, chunk_store: MemChunkStore) -> BuckyResult<Self> {
47        let stack = Stack::from(&weak_stack);
48        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::from_str(stack.config().debug.as_ref().unwrap().local.as_str()).unwrap()), 
49            stack.config().debug.as_ref().unwrap().port);
50        let listener = TcpListener::bind(addr).await?;
51        Ok(Self(Arc::new(DebugStubImpl {
52            stack: weak_stack, 
53            listener,
54            chunk_store,
55        })))
56    }
57
58    pub fn listen(&self) {
59        const READ_CMD_TIMEOUT: u64 = 30;
60
61        let stub = self.clone();
62        task::spawn(async move {
63            let mut incoming = stub.0.listener.incoming();
64            loop {
65                if let Some(stream) = incoming.next().await {
66                    if let Ok(stream) = stream {
67                        let stub = stub.clone();
68                        task::spawn(async move {
69                            let mut stream = stream;
70                            let mut command = String::new();
71                            match future::timeout(Duration::from_secs(READ_CMD_TIMEOUT), stream.read_to_string(&mut command)).await {
72                                Err(_) => {
73                                    error!("read cmd timeout!");
74                                },
75                                Ok(_) => {
76                                    stub.handle_command(command, stream).await;
77                                }
78                            }
79                        });
80                    }
81                }
82            }
83        });
84    }
85
86    async fn handle_command(&self, command: String, tunnel: TcpStream) {
87        println!("command:{}", command);
88        let stack = Stack::from(&self.0.stack);
89        let mut tunnel = tunnel;
90        match DebugCommand::from_str(&stack, &command).await {
91            Ok(command) => {
92                if let Err(err) = match command {
93                    DebugCommand::Test(_) => self.test(tunnel.clone()).await, 
94                    DebugCommand::Ping(command) => self.ping(tunnel.clone(), command).await, 
95                    DebugCommand::Nc(command) => self.nc(tunnel.clone(), command).await,
96                    DebugCommand::GetChunk(command) => self.get_chunk(tunnel.clone(), command).await,
97                    DebugCommand::GetFile(command) => self.get_file(tunnel.clone(), command).await,
98                    DebugCommand::PutChunk(command) => self.put_chunk(tunnel.clone(), command).await,
99                    DebugCommand::PutFile(command) => self.put_file(tunnel.clone(), command).await,
100                    DebugCommand::SnConnStatus(command) => self.sn_conn_status(tunnel.clone(), command).await,
101                    DebugCommand::BenchDatagram(command) => self.bench_datagram(tunnel.clone(), command).await,
102                } {
103                    let _ = tunnel.write_all(err.as_ref()).await;
104                }
105            }, 
106            Err(err) => {
107                let _ = tunnel.write_all(err.as_ref()).await;
108            }
109        }
110    }
111
112    async fn test(&self, tunnel: TcpStream) -> Result<(), String> {
113        let mut tunnel = tunnel;
114
115        let _ = tunnel.write_all(format!("hello\r\n").as_bytes()).await;
116        let _ = tunnel.write_all(format!("bdt\r\n").as_bytes()).await;
117
118        Ok(())
119    }
120
121    async fn sn_conn_status(&self, tunnel: TcpStream, command: DebugCommandSnConnStatus) -> Result<(), String> {
122        let mut tunnel = tunnel;
123
124        let stack = Stack::from(&self.0.stack);
125        let timeout = {
126            if command.timeout_sec == 0 {
127                6
128            } else {
129                command.timeout_sec
130            }
131        };
132
133        let sleep_ms = 200; 
134        let mut counter = timeout*(1000/sleep_ms);
135        loop {
136            let sn_status = stack.sn_client().ping().status();
137
138            if let Some(sn_status) = sn_status {
139                if let SnStatus::Online = sn_status {
140                    let _ = tunnel.write_all("Ok: sn connected\r\n".as_ref()).await;
141
142                    return Ok(())
143                }
144            }
145
146            counter -= 1;
147            if counter == 0 {
148                break ;
149            }
150
151            task::sleep(Duration::from_millis(sleep_ms)).await;
152        }
153
154        let _ = tunnel.write_all("Err: sn connect timeout\r\n".as_ref()).await;
155
156        Ok(())
157    }
158
159    async fn ping(&self, tunnel: TcpStream, command: DebugCommandPing) -> Result<(), String> {
160        let mut tunnel = tunnel;
161        let stack = Stack::from(&self.0.stack);
162        let datagram = stack.datagram_manager().bind(0)
163            .map_err(|err| format!("deamon bind datagram tunnel failed for {}\r\n", err))?;
164        for _ in 0..command.count {
165            let mut options = DatagramOptions::default();
166            let _ = tunnel.write_all("send ping.\r\n".as_ref()).await;
167
168            let ts = cyfs_base::bucky_time_now();
169            options.sequence = Some(TempSeq::from(ts as u32));
170            let _ = datagram.send_to(
171                "debug".as_ref(), 
172                &mut options, 
173                &command.remote.desc().device_id(), 
174                datagram::ReservedVPort::Debug.into());
175            match future::timeout(command.timeout, datagram.recv_v()).await {
176                Err(_err) => {
177                    let _ = tunnel.write_all("timeout\r\n".as_ref()).await;
178                },
179                Ok(res) => {
180                    let datagrams = res.unwrap();
181                    for datagram in datagrams {
182                        if let Some(opt) = datagram.options.sequence {
183                            if opt == options.sequence.unwrap() {
184                                let s = format!("respose. time: {:.1} ms\r\n", 
185                                                    (cyfs_base::bucky_time_now() - ts) as f64 / 1000.0);
186                                let _ = tunnel.write_all(s.as_bytes()).await;
187                                break ;
188                            }
189                        }
190                    }
191                }
192            }
193        }
194
195        Ok(())
196    }
197
198    async fn bench_datagram(&self, tunnel: TcpStream, command: DebugCommandBenchDatagram) -> Result<(), String> {
199        let mut tunnel = tunnel;
200
201        let from = 1;
202        let to = 65535;
203        let plaintext = command.plaintext;
204
205        let s = format!("bench_datagram: plaintext:{} timeout:{:?} from:{} to:{}\r\n",
206            plaintext, command.timeout, from, to);
207        let _ = tunnel.write_all(s.as_bytes()).await;
208
209        let mut n_ok = 0;
210        let stack = Stack::from(&self.0.stack);
211        let datagram = stack.datagram_manager().bind(0)
212            .map_err(|err| format!("deamon bind datagram tunnel failed for {}\r\n", err))?;
213        for i in from..to {
214            let mut options = DatagramOptions::default();
215            let _ = tunnel.write_all("send data.\r\n".as_ref()).await;
216
217            let data = rand_data_gen(i);
218            let ts = cyfs_base::bucky_time_now();
219            options.sequence = Some(TempSeq::from(ts as u32));
220            if i%2 == 0 {
221                options.create_time = Some(ts+10);
222            }
223            if i%4 == 0 {
224                options.send_time = Some(ts+20);
225            }
226            if i%8 == 0 {
227                options.author_id = Some(command.remote.desc().device_id().clone());
228            }
229            options.plaintext = plaintext;
230            let _ = datagram.send_to(
231                &data, 
232                &mut options, 
233                &command.remote.desc().device_id(), 
234                datagram::ReservedVPort::Debug.into());
235            match future::timeout(command.timeout, datagram.recv_v()).await {
236                Err(_err) => {
237                    let _ = tunnel.write_all("timeout\r\n".as_ref()).await;
238                },
239                Ok(res) => {
240                    let datagrams = res.unwrap();
241                    for datagram in datagrams {
242                        if let Some(opt) = datagram.options.sequence {
243                            if opt == options.sequence.unwrap() {
244                                let md5_recv = md5::compute(&datagram.data);
245                                let md5_send = md5::compute(&data);
246                                if md5_recv == md5_send {
247                                    n_ok += 1;
248                                }
249                                let s = format!("respose: plaintext: {} time: {:.1} ms, success {}/{}, fail {}\r\n", 
250                                                    options.plaintext,
251                                                    (cyfs_base::bucky_time_now() - ts) as f64 / 1000.0,
252                                                    n_ok,
253                                                    i,
254                                                    i-n_ok);
255                                let _ = tunnel.write_all(s.as_bytes()).await;
256                                break ;
257                            }
258                        }
259                    }
260                }
261            }
262        }
263
264        Ok(())
265    }
266
267    async fn nc(&self, tunnel: TcpStream, command: DebugCommandNc) -> Result<(), String> {
268        let stack = Stack::from(&self.0.stack);
269        let task_num = if command.task_num == 0 {
270            1
271        } else {
272            command.task_num
273        };
274        let mut tasks = vec![];
275
276        for task_id in 0..task_num {
277            let mut t = tunnel.clone();
278            let c = command.clone();
279            let s = stack.clone();
280            tasks.push(task::spawn(async move {
281                match nc_task(t.clone(), c, s, task_id).await {
282                    Err(e) => {
283                        let _ = t.write_all(format!("nc_task err={}\r\n", e).as_ref()).await;
284                    },
285                    Ok(_) => {
286                    }
287                }
288            }));
289        }
290
291        for t in tasks {
292            let _ = t.await;
293        }
294
295        Ok(())
296    }
297
298    async fn get_chunk(&self, tunnel: TcpStream, command: DebugCommandGetChunk) -> Result<(), String> {
299        let mut tunnel = tunnel;
300
301        let chunk_id = command.chunk_id;
302        let remotes = command.remotes;
303        //let local_path = command.local_path;
304
305        let stack = Stack::from(&self.0.stack);
306
307        let chunk_store = self.0.chunk_store.clone();
308        let context = SampleDownloadContext::desc_streams("".to_string(), remotes);
309        let begin = Instant::now();
310        match download_chunk(&stack, chunk_id.clone(),None, context).await {
311            Ok((_, reader)) => {
312                chunk_store.write_chunk(&chunk_id, reader).await.unwrap();
313                match future::timeout(Duration::from_secs(600), get_chunk_wait_finish(stack.clone(), chunk_id.clone())).await {
314                    Err(e) => {
315                        let _ = tunnel.write_all(format!("get_chunk_wait_finish err={}\r\n", e).as_ref()).await;
316                    },
317                    Ok(r) => {
318                        match r {
319                            Ok(n) => {
320                                let cost_secs = begin.elapsed().as_secs_f64();
321                                let _ = tunnel.write_all(format!("get success\r\n").as_ref()).await;
322                                if chunk_id.len() != n {
323                                    let _ = tunnel.write_all(format!("data wrong, recv_len={} want={}\r\n", n, chunk_id.len()).as_ref()).await;
324                                } else {
325                                    let len = n as f64;
326                                    let speed = if cost_secs > 0.0 {
327                                        len / cost_secs / 1024.0
328                                    } else {
329                                        999999.9
330                                    };
331                                    let _ = tunnel.write_all(format!("cost={:.3}s len={:.1}KB speed={:.1}KB/s\r\n",
332                                    cost_secs, len/1024.0, speed).as_ref()).await;
333                                }
334                            },
335                            Err(e) => {
336                                let _ = tunnel.write_all(format!("get_chunk_wait_finish err={}\r\n", e).as_ref()).await;
337                            }
338                        }
339                    }
340                }
341            },
342            Err(e) => {
343                let _ = tunnel.write_all(format!("download_chunk err={}\r\n", e).as_ref()).await;
344            }
345        }
346
347        Ok(())
348    }
349
350    async fn get_file(&self, tunnel: TcpStream, command: DebugCommandGetFile) -> Result<(), String> {
351        let mut tunnel = tunnel;
352
353        let file_id = command.file_id;
354        let remotes = command.remotes;
355        let timeout = command.timeout;
356        let local_path = command.local_path;
357
358
359        let _ = tunnel.write_all("start downloading file..\r\n".as_ref()).await;
360
361        let stack = Stack::from(&self.0.stack);
362        let context = SampleDownloadContext::id_streams(&stack, "".to_owned(), &remotes).await
363            .map_err(|e| format!("download err: {}\r\n", e))?;
364        let (_, reader) = download_file(
365            &stack, 
366            file_id.clone(), 
367            None,  
368            context)
369            .await.map_err(|e| {
370                format!("download err: {}\r\n", e)
371            })?;
372        
373        let _ = future::timeout(Duration::from_secs(timeout as u64), tunnel.write_all("waitting..\r\n".as_ref())).await;
374
375        LocalChunkListWriter::from_file(local_path, &file_id)
376            .map_err(|e| {
377                format!("download err: {}\r\n", e)
378            })?
379            .write(reader).await
380            .map_err(|e| {
381                format!("download err: {}\r\n", e)
382            })?;
383
384        let _ = tunnel.write_all("download file finish.\r\n".as_ref()).await;
385        Ok(())
386     }
387
388     async fn put_chunk(&self, tunnel: TcpStream, command: DebugCommandPutChunk) -> Result<(), String> {
389        let mut tunnel = tunnel;
390        let local_path = command.local_path;
391
392        if local_path.as_path().exists() {
393            let mut file = async_std::fs::File::open(local_path.as_path()).await.map_err(|e| {
394                format!("open file err: {}\r\n", e)
395            })?;
396            let mut content = Vec::<u8>::new();
397            let _ = file.read_to_end(&mut content).await.map_err(|e| {
398                format!("read file err: {}\r\n", e)
399            })?;
400
401            if content.len() == 0 {
402                return Err(format!("file size is zero\r\n"));
403            }
404
405            let chunk_store = self.0.chunk_store.clone();
406            match ChunkId::calculate(content.as_slice()).await {
407                Ok(chunk_id) => {
408                    match chunk_store.add(chunk_id.clone(), Arc::new(content)).await {
409                        Ok(_) => {
410                            let _ = tunnel.write_all(format!("put chunk success, chunk_id={}\r\n", chunk_id).as_ref()).await;
411                        },
412                        Err(e) => {
413                            let _ = tunnel.write_all(format!("put chunk fail, err={}\r\n", e).as_ref()).await;
414                        }
415                    }
416                    Ok(())
417                }, 
418                Err(e) => {
419                    Err(format!("calculate chunk id err: {}\r\n", e))
420                }
421            }
422        } else {
423            Err(format!("file not exists: {}\r\n", local_path.to_str().unwrap()))
424        }
425     }
426
427     async fn put_file(&self, _tunnel: TcpStream, _command: DebugCommandPutFile) -> Result<(), String> {
428        // let mut tunnel = tunnel;
429        // let stack = Stack::from(&self.0.stack);
430        // let local_path = command.local_path;
431
432        // if local_path.as_path().exists() {
433        //     let chunkids = {
434        //         let _ = tunnel.write_all("calculate chunkid by file..\r\n".as_ref()).await;
435
436        //         let chunk_size: usize = 10 * 1024 * 1024;
437        //         let mut chunkids = Vec::new();
438        //         let mut file = File::open(local_path.as_path()).await.map_err(|e| {
439        //             format!("open file err: {}\r\n", e)
440        //         })?;
441        //         loop {
442        //             let mut buf = vec![0u8; chunk_size];
443        //             let len = file.read(&mut buf).await.map_err(|e| {
444        //                 format!("read file err: {}\r\n", e)
445        //             })?;
446        //             if len > 0 {
447        //                 if len < chunk_size {
448        //                     buf.truncate(len);
449        //                 }
450        //                 let hash = hash_data(&buf[..]);
451        //                 let chunkid = ChunkId::new(&hash, buf.len() as u32);
452        //                 chunkids.push(chunkid);
453        //             }
454        //             if len < chunk_size {
455        //                 break ;
456        //             }
457        //         }
458        //         chunkids
459        //     };
460
461        //     let (hash, len) = hash_file(local_path.as_path()).await.map_err(|e| {
462        //         format!("hash file err: {}\r\n", e)
463        //     })?;
464        //     let file = cyfs_base::File::new(
465        //         ObjectId::default(),
466        //         len,
467        //         hash,
468        //         ChunkList::ChunkInList(chunkids)
469        //     ).no_create_time().build();
470
471        //     let buf_len_ret = file.raw_measure(&None);
472        //     if buf_len_ret.is_err() {
473        //         return Err(format!("raw_measure err\r\n"));
474        //     }
475        //     let mut buf = vec![0u8; buf_len_ret.unwrap()];
476        //     let encode_ret = file.raw_encode(buf.as_mut_slice(), &None);
477        //     if encode_ret.is_err() {
478        //         return Err(format!("raw_encode err\r\n"));
479        //     }
480
481        //     track_file_in_path(&stack, file, local_path).await
482        //         .map_err(|e| format!("track file err {}\r\n", e))?;
483
484        //     let _ = tunnel.write_all(format!("put file sucess. file_id: {}\r\n", 
485        //         hex::encode(buf)).as_bytes()).await;
486
487        //     Ok(())
488        // } else {
489        //     Err(format!("{} not exists\r\n", &local_path.to_str().unwrap()))
490        // }
491        Err("not supported now".to_owned())
492    }
493}
494
495async fn watchdog_download_finished(task: Box<dyn DownloadTask>, timeout: u32) -> Result<(), String> {
496    let mut _timeout = 1800; //todo: when bdt support download speed, use timeout instead
497    let mut i = 0;
498
499    loop {
500        match task.state() {
501            NdnTaskState::Finished => {
502                break Ok(());
503            },
504            NdnTaskState::Running => {
505                if task.cur_speed() > 0 {
506                    i = 0;
507
508                    if _timeout == 1800 { //todo
509                        _timeout = timeout;
510                    }
511                } else {
512                    i += 1;
513                }
514            },
515            NdnTaskState::Error(e) => {
516                break Err(format!("download err, code: {:?}\r\n", e));
517            },
518            _ => {
519
520            }
521        }
522
523        if i >= _timeout {
524            break Err(format!("download timeout\r\n"));
525        }
526
527        task::sleep(Duration::from_secs(1)).await;
528    }
529}
530
531fn get_filesize(path: &Path) -> u64 {
532    let res = std::fs::File::open(path);
533    if res.is_ok() {
534        let file = res.unwrap();
535        return file.metadata().unwrap().len();
536    }
537
538    0
539}
540
541fn rand_data_gen(len: usize) -> Vec<u8> {
542    let mut buf = Vec::new();
543    buf.resize(len, 0u8);
544
545    let mut r = 0;
546    for i in 0..len {
547        if i%10 == 0 {
548            r = rand::random::<u8>();
549        }
550        buf[i] = r;
551    }
552
553    buf
554}
555
556fn rand_data_gen_buf(len: usize) -> Vec<u8> {
557    let mut buf = Vec::new();
558    buf.resize(len + 8, 0u8);
559
560    buf[0..8].copy_from_slice(&len.to_be_bytes());
561
562    let mut r = 0;
563    for i in 8..len {
564        if i%10 == 0 {
565            r = rand::random::<u8>();
566        }
567        buf[i] = r;
568    }
569
570    buf
571}
572
573fn rand_char(len: usize) -> Vec<u8> {
574    let mut buf = Vec::new();
575    buf.resize(len, 0u8);
576
577    for i in 0..len {
578        buf[i] = 97 + rand::random::<u8>() % 26;
579    }
580
581    buf
582}
583
584async fn get_chunk_wait_finish(stack: Stack, chunk_id: ChunkId) -> BuckyResult<usize> {
585    let mut len = 0;
586    loop {
587        let ret = stack.ndn().chunk_manager().store().get(&chunk_id).await;
588        if let Ok(mut reader) = ret {
589            let mut content = vec![0u8; 2048];
590
591            loop {
592                let n = reader.read(content.as_mut_slice()).await?;
593                if n == 0 {
594                    break ;
595                }
596                len += n;
597            }
598
599            return Ok(len);
600        } else {
601            task::sleep(Duration::from_millis(200)).await;
602        }
603    }
604}
605
606async fn nc_task(tunnel: TcpStream, command: DebugCommandNc, stack: Stack, task_id: u32) -> Result<(), String> {
607    let mut tunnel = tunnel;
608    let _ = tunnel.write_all(format!("[{}] connecting stream\r\n", task_id).as_ref()).await;
609
610    let question = b"question?";
611    let mut conn = stack.stream_manager().connect(
612        command.port, 
613        question.to_vec(), 
614        BuildTunnelParams {
615            remote_const: command.remote.desc().clone(), 
616            remote_sn: None, 
617            remote_desc: Some(command.remote.clone())
618    }).await.map_err(|err| format!("Err: {}\r\n", err.msg().to_string()))?;
619
620    let _ = tunnel.write_all(format!("[{}] Connect success, read answer\r\n", task_id).as_ref()).await;
621
622    let mut answer = [0; 128];
623    match conn.read(&mut answer).await {
624        Ok(len) => {
625            let s = format!("[{}] Read answer success, len={} content={:?}\r\n", 
626                task_id, len, String::from_utf8(answer[..len].to_vec()).expect(""));
627            let _ = tunnel.write_all(s.as_bytes()).await;
628        },
629        Err(e) => {
630            let s = format!("[{}] Read answer fail, err={}\r\n", task_id, e);
631            let _ = tunnel.write_all(s.as_bytes()).await;
632            return Ok(());
633        }
634    }
635
636    let _ = conn.write_all(b"hello world").await;
637
638    let mut buf = [0u8; 128];
639    match conn.read(&mut buf).await {
640        Ok(len) => {
641            let s = format!("[{}] Read data success, len={} content={:?}\r\n", 
642                task_id, len, String::from_utf8(buf[..len].to_vec()).expect(""));
643            let _ = tunnel.write_all(s.as_bytes()).await;
644        },
645        Err(e) => {
646            let s = format!("[{}] Read data fail, err={}\r\n", task_id, e);
647            let _ = tunnel.write_all(s.as_bytes()).await;
648            return Ok(());
649        }
650    }
651
652    let _ = tunnel.write_all(format!("[{}] Ok: stream connected\r\n", task_id).as_ref()).await;
653
654    if command.bench > 0 {
655        let _ = tunnel.write_all(format!("[{}] start bench size={}MB\r\n", task_id, command.bench).as_ref()).await;
656
657        let buf = rand_char(1024);
658        let mut i: u32 = 0;
659        let max = command.bench * 1024;
660        let begin = Instant::now();
661        loop {
662            match conn.write_all(&buf).await {
663                Ok(_) => {
664                    i += 1;
665                },
666                Err(e) => {
667                    let _ = tunnel.write_all(format!("[{}] write err={}\r\n", task_id, e).as_ref()).await;
668                    break;
669                }
670            }
671            if i >= max {
672                break;
673            }
674        }
675        let cost = begin.elapsed().as_secs_f64();
676         let speed = if cost > 0.0 {
677            i as f64 / cost
678        } else {
679            999999.9
680        };
681        let _ = tunnel.write_all(format!("[{}] bench over. cost={:.3}s len={}KB speed={:.1}KB/s\r\n",
682           task_id, cost, i, speed).as_ref()).await;
683    }
684
685    let _ = conn.shutdown(Shutdown::Both);
686
687    Ok(())
688}