sonic_client/
search.rs

1use mio::net::TcpStream;
2use mio::{Events, PollOpt, Ready, Token};
3use std::collections::HashMap;
4use std::io::prelude::*;
5use std::io::{BufReader, BufWriter};
6use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
7use std::sync::{Arc, Mutex};
8use std::thread;
9
10pub struct Task {
11    pub msg: String,
12    pub sender: SyncSender<String>,
13}
14
15impl Task {
16    pub fn new(msg: String) -> (Self, (SyncSender<String>, Receiver<String>)) {
17        let (sender, receiver) = sync_channel::<String>(2);
18        (
19            Task {
20                msg: msg,
21                sender: sender.clone(),
22            },
23            (sender, receiver),
24        )
25    }
26}
27
28pub struct SearchChan {
29    host: String,
30    port: usize,
31    password: String,
32    conn: TcpStream,
33    search_ids: Arc<Mutex<HashMap<String, SyncSender<String>>>>,
34    tasks: Arc<Mutex<Vec<Task>>>,
35}
36
37const CLIENT: Token = Token(0);
38
39impl SearchChan {
40    pub fn new(host: &str, port: usize, password: &str) -> Result<Self, std::io::Error> {
41        let stream = TcpStream::connect(
42            &format!("{}:{}", host, port)
43                .parse()
44                .expect("Failed to parse socket address"),
45        )?;
46        let chan = SearchChan {
47            host: host.clone().into(),
48            port: port,
49            password: password.clone().into(),
50            conn: stream,
51            search_ids: Arc::new(Mutex::new(HashMap::new())),
52            tasks: Arc::new(Mutex::new(Vec::new())),
53        };
54        Ok(chan)
55    }
56
57    pub fn connect(&mut self) -> Result<String, std::io::Error> {
58        let msg = format!("START search {}\n", &self.password);
59        println!("{}", msg);
60        let (task, (_, receiver)) = Task::new(msg.clone());
61        {
62            let mut t = self.tasks.lock().expect("Failed to acquire task lock");
63            t.push(task);
64        }
65        let conn = self.conn.try_clone()?;
66        let mut writer = BufWriter::new(conn);
67        writer.write_all(msg.as_bytes())?;
68        Ok(receiver.recv().unwrap_or("".to_string()))
69    }
70
71    pub fn read(&mut self) -> thread::JoinHandle<()> {
72        let conn = self.conn.try_clone().unwrap();
73        let tasks = Arc::clone(&self.tasks);
74        let search_ids = Arc::clone(&self.search_ids);
75        thread::spawn(move || {
76            let poll = mio::Poll::new().unwrap();
77            poll.register(&conn, CLIENT, Ready::readable(), PollOpt::edge())
78                .unwrap();
79            let mut events = Events::with_capacity(1024);
80            let mut reader = BufReader::new(&conn);
81            let mut line = String::new();
82            'event_loop: loop {
83                poll.poll(&mut events, None).unwrap();
84                for event in events.iter() {
85                    match event.token() {
86                        CLIENT => match reader.read_line(&mut line) {
87                            Ok(_) => {
88                                if line.ends_with("\r\n") {
89                                    if line.starts_with("ERR") {
90                                        let mut t = tasks.lock().unwrap();
91                                        if t.len() > 0 {
92                                            let task = t.remove(0);
93                                            task.sender
94                                                .send(line.clone())
95                                                .expect("Failed to send msg err");
96                                        }
97                                    } else if line.starts_with("CONNECTED") {
98                                        let mut t = tasks.lock().unwrap();
99                                        if t.len() > 0 {
100                                            let task = t.remove(0);
101                                            task.sender
102                                                .send(line.clone())
103                                                .expect("Failed to send msg connected");
104                                        }
105                                    } else if line.starts_with("STARTED") {
106                                        // Do nothing
107                                    } else if line.starts_with("EVENT") {
108                                        let tokens = line.split(" ").collect::<Vec<&str>>();
109                                        let id = tokens[2];
110                                        let mut ids = search_ids
111                                            .lock()
112                                            .expect("Failed to acquire search_ids lock");
113                                        if let Some(sender) = ids.remove(id) {
114                                            sender
115                                                .send(tokens.join(" "))
116                                                .expect("Failed to send event");
117                                        }
118                                    } else {
119                                        let mut t = tasks.lock().unwrap();
120                                        if t.len() > 0 {
121                                            let task = t.remove(0);
122                                            task.sender
123                                                .send(line.clone())
124                                                .expect("Failed to send msg");
125                                        }
126                                    }
127                                    if line.starts_with("ENDED") {
128                                        break 'event_loop;
129                                    }
130                                    line = String::new();
131                                }
132                            }
133                            Err(e) => {
134                                if e.kind() != std::io::ErrorKind::WouldBlock {
135                                    println!("{:?}", e);
136                                }
137                            }
138                        },
139                        _ => unreachable!(),
140                    };
141                }
142            }
143        })
144    }
145
146    pub fn write(
147        &mut self,
148        msg: String,
149    ) -> Result<(SyncSender<String>, Receiver<String>), std::io::Error> {
150        let (task, (sender, receiver)) = Task::new(msg.clone());
151        {
152            let mut t = self.tasks.lock().expect("Failed to acquire task lock");
153            t.push(task);
154        }
155        let conn = self.conn.try_clone()?;
156        let mut writer = BufWriter::new(conn);
157        writer.write_all(msg.as_bytes())?;
158        Ok((sender.clone(), receiver))
159    }
160
161    pub fn query(
162        &mut self,
163        collection: &str,
164        bucket: &str,
165        terms: &str,
166        limit: Option<i32>,
167        offset: Option<&str>,
168    ) -> Result<Vec<String>, std::io::Error> {
169        let (sender, receiver) = self.write(format!(
170            "QUERY {} {} {} {} {}\r\n",
171            collection,
172            bucket,
173            format!("\"{}\"", terms),
174            limit
175                .and_then(|l| Some(format!("LIMIT({})", l)))
176                .unwrap_or("".to_string()),
177            offset
178                .and_then(|l| Some(format!("OFFSET({})", l)))
179                .unwrap_or("".to_string()),
180        ))?;
181        if let Ok(id) = receiver.recv() {
182            let mut search_ids = self
183                .search_ids
184                .lock()
185                .expect("Failed to acquire search_id lock");
186            search_ids.insert(
187                id.split(" ").collect::<Vec<&str>>()[1].trim().to_string(),
188                sender,
189            );
190        }
191        match receiver.recv() {
192            Ok(result) => Ok(result
193                .split(" ")
194                .map(|s| s.to_string())
195                .collect::<Vec<String>>()),
196            Err(_) => Err(std::io::Error::new(
197                std::io::ErrorKind::Other,
198                "Failed to receive from query",
199            )),
200        }
201    }
202
203    // TODO: check if suggest id conflicts with search
204    pub fn suggest(
205        &mut self,
206        collection: &str,
207        bucket: &str,
208        word: &str,
209        limit: Option<i32>,
210    ) -> Result<Vec<String>, std::io::Error> {
211        let (sender, receiver) = self.write(format!(
212            "SUGGEST {} {} {} {}\r\n",
213            collection,
214            bucket,
215            format!("\"{}\"", word),
216            limit
217                .and_then(|l| Some(format!("LIMIT({})", l)))
218                .unwrap_or("".to_string()),
219        ))?;
220        if let Ok(id) = receiver.recv() {
221            let mut search_ids = self
222                .search_ids
223                .lock()
224                .expect("Failed to acquire search_id lock");
225            search_ids.insert(
226                id.split(" ").collect::<Vec<&str>>()[1].trim().to_string(),
227                sender,
228            );
229        }
230        match receiver.recv() {
231            Ok(result) => Ok(result
232                .split(" ")
233                .map(|s| s.to_string())
234                .collect::<Vec<String>>()),
235            Err(_) => Err(std::io::Error::new(
236                std::io::ErrorKind::Other,
237                "Failed to receive from suggest",
238            )),
239        }
240    }
241
242    pub fn ping(&mut self) -> Result<Receiver<String>, std::io::Error> {
243        let (_, receiver) = self.write("PING\r\n".to_string())?;
244        Ok(receiver)
245    }
246
247    pub fn quit(&mut self) -> Result<Receiver<String>, std::io::Error> {
248        let (_, receiver) = self.write("QUIT\r\n".to_string())?;
249        Ok(receiver)
250    }
251
252    pub fn help(&mut self, manual: Option<&str>) -> Result<Receiver<String>, std::io::Error> {
253        let (_, receiver) = self.write(format!("HELP {}\r\n", manual.unwrap_or("")))?;
254        Ok(receiver)
255    }
256}
257
258mod test {
259    use super::*;
260    use std::time;
261    #[test]
262    fn test_search() {
263        let mut s = SearchChan::new("127.0.0.1", 1491, "haha").expect("Connection error");
264        let handle = s.read();
265        assert_eq!("CONNECTED <sonic-server v1.1.8>\r\n", s.connect().unwrap());
266        thread::sleep(time::Duration::from_secs(4));
267        let r1 = s
268            .query("helpdesk", "user:0dcde3a6", "gdpr", Some(50), None)
269            .unwrap();
270        let r2 = s.ping().unwrap();
271        let r3 = s.quit().unwrap();
272        assert_eq!("EVENT", r1[0]);
273        assert_eq!("PONG\r\n", r2.recv().unwrap());
274        assert_eq!("ENDED quit\r\n", r3.recv().unwrap());
275        handle.join().expect("Failed to wait process");
276    }
277}