async/
async.rs

1use r2pipe::R2Pipe;
2
3use std::sync::mpsc::channel;
4use std::sync::mpsc::Receiver;
5use std::sync::mpsc::Sender;
6use std::sync::Arc;
7use std::thread;
8
9const FILENAME: &'static str = "/bin/ls";
10
11pub struct R2PipeAsync {
12    tx: Sender<String>,
13    rx: Receiver<String>,
14    tx2: Sender<String>,
15    rx2: Receiver<String>,
16    cbs: Vec<Arc<dyn Fn(String)>>,
17}
18
19impl R2PipeAsync {
20    pub fn open() -> R2PipeAsync {
21        let (tx, rx) = channel(); // query
22        let (tx2, rx2) = channel(); // result
23        R2PipeAsync {
24            tx: tx,
25            rx: rx,
26            tx2: tx2,
27            rx2: rx2,
28            cbs: Vec::new(),
29        }
30    }
31
32    pub fn cmd(&mut self, str: &'static str, cb: Arc<dyn Fn(String)>) {
33        self.cbs.insert(0, cb);
34        self.tx.send(str.to_string()).unwrap();
35    }
36
37    pub fn quit(&mut self) {
38        self.tx.send("q".to_string()).unwrap();
39    }
40
41    pub fn mainloop(mut self) {
42        let child_rx = self.rx;
43        let child_tx = self.tx2.clone();
44        let child = thread::spawn(move || {
45            let mut r2p = match R2Pipe::in_session() {
46                Some(_) => R2Pipe::open(),
47                None => R2Pipe::spawn(FILENAME.to_owned(), None),
48            }
49            .unwrap();
50            loop {
51                let msg = child_rx.recv().unwrap();
52                if msg == "q" {
53                    // push a result without callback
54                    child_tx.send("".to_owned()).unwrap();
55                    drop(child_tx);
56                    break;
57                }
58                let res = r2p.cmd(&msg).unwrap();
59                child_tx.send(res).unwrap();
60            }
61            r2p.close();
62        });
63
64        // main loop
65        loop {
66            let msg = self.rx2.recv();
67            if msg.is_ok() {
68                let res = msg.unwrap();
69                if let Some(cb) = self.cbs.pop() {
70                    cb(res.trim().to_string());
71                } else {
72                    break;
73                }
74            } else {
75                break;
76            }
77        }
78        child.join().unwrap();
79    }
80}
81
82fn main() {
83    let mut r2pa = R2PipeAsync::open();
84    r2pa.cmd(
85        "?e One",
86        Arc::new(|x| {
87            println!("One: {}", x);
88        }),
89    );
90    r2pa.cmd(
91        "?e Two",
92        Arc::new(|x| {
93            println!("Two: {}", x);
94        }),
95    );
96    r2pa.quit();
97    r2pa.mainloop();
98}