1use mio::net::TcpStream;
2use mio::{Events, PollOpt, Ready, Token};
3use std::collections::HashMap;
4use std::io::prelude::*;
5use std::io::{BufReader, BufWriter};
6use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
7use std::sync::{Arc, Mutex};
8use std::thread;
9
10pub struct Task {
11 pub msg: String,
12 pub sender: SyncSender<String>,
13}
14
15impl Task {
16 pub fn new(msg: String) -> (Self, (SyncSender<String>, Receiver<String>)) {
17 let (sender, receiver) = sync_channel::<String>(2);
18 (
19 Task {
20 msg: msg,
21 sender: sender.clone(),
22 },
23 (sender, receiver),
24 )
25 }
26}
27
28pub struct SearchChan {
29 host: String,
30 port: usize,
31 password: String,
32 conn: TcpStream,
33 search_ids: Arc<Mutex<HashMap<String, SyncSender<String>>>>,
34 tasks: Arc<Mutex<Vec<Task>>>,
35}
36
37const CLIENT: Token = Token(0);
38
39impl SearchChan {
40 pub fn new(host: &str, port: usize, password: &str) -> Result<Self, std::io::Error> {
41 let stream = TcpStream::connect(
42 &format!("{}:{}", host, port)
43 .parse()
44 .expect("Failed to parse socket address"),
45 )?;
46 let chan = SearchChan {
47 host: host.clone().into(),
48 port: port,
49 password: password.clone().into(),
50 conn: stream,
51 search_ids: Arc::new(Mutex::new(HashMap::new())),
52 tasks: Arc::new(Mutex::new(Vec::new())),
53 };
54 Ok(chan)
55 }
56
57 pub fn connect(&mut self) -> Result<String, std::io::Error> {
58 let msg = format!("START search {}\n", &self.password);
59 println!("{}", msg);
60 let (task, (_, receiver)) = Task::new(msg.clone());
61 {
62 let mut t = self.tasks.lock().expect("Failed to acquire task lock");
63 t.push(task);
64 }
65 let conn = self.conn.try_clone()?;
66 let mut writer = BufWriter::new(conn);
67 writer.write_all(msg.as_bytes())?;
68 Ok(receiver.recv().unwrap_or("".to_string()))
69 }
70
71 pub fn read(&mut self) -> thread::JoinHandle<()> {
72 let conn = self.conn.try_clone().unwrap();
73 let tasks = Arc::clone(&self.tasks);
74 let search_ids = Arc::clone(&self.search_ids);
75 thread::spawn(move || {
76 let poll = mio::Poll::new().unwrap();
77 poll.register(&conn, CLIENT, Ready::readable(), PollOpt::edge())
78 .unwrap();
79 let mut events = Events::with_capacity(1024);
80 let mut reader = BufReader::new(&conn);
81 let mut line = String::new();
82 'event_loop: loop {
83 poll.poll(&mut events, None).unwrap();
84 for event in events.iter() {
85 match event.token() {
86 CLIENT => match reader.read_line(&mut line) {
87 Ok(_) => {
88 if line.ends_with("\r\n") {
89 if line.starts_with("ERR") {
90 let mut t = tasks.lock().unwrap();
91 if t.len() > 0 {
92 let task = t.remove(0);
93 task.sender
94 .send(line.clone())
95 .expect("Failed to send msg err");
96 }
97 } else if line.starts_with("CONNECTED") {
98 let mut t = tasks.lock().unwrap();
99 if t.len() > 0 {
100 let task = t.remove(0);
101 task.sender
102 .send(line.clone())
103 .expect("Failed to send msg connected");
104 }
105 } else if line.starts_with("STARTED") {
106 } else if line.starts_with("EVENT") {
108 let tokens = line.split(" ").collect::<Vec<&str>>();
109 let id = tokens[2];
110 let mut ids = search_ids
111 .lock()
112 .expect("Failed to acquire search_ids lock");
113 if let Some(sender) = ids.remove(id) {
114 sender
115 .send(tokens.join(" "))
116 .expect("Failed to send event");
117 }
118 } else {
119 let mut t = tasks.lock().unwrap();
120 if t.len() > 0 {
121 let task = t.remove(0);
122 task.sender
123 .send(line.clone())
124 .expect("Failed to send msg");
125 }
126 }
127 if line.starts_with("ENDED") {
128 break 'event_loop;
129 }
130 line = String::new();
131 }
132 }
133 Err(e) => {
134 if e.kind() != std::io::ErrorKind::WouldBlock {
135 println!("{:?}", e);
136 }
137 }
138 },
139 _ => unreachable!(),
140 };
141 }
142 }
143 })
144 }
145
146 pub fn write(
147 &mut self,
148 msg: String,
149 ) -> Result<(SyncSender<String>, Receiver<String>), std::io::Error> {
150 let (task, (sender, receiver)) = Task::new(msg.clone());
151 {
152 let mut t = self.tasks.lock().expect("Failed to acquire task lock");
153 t.push(task);
154 }
155 let conn = self.conn.try_clone()?;
156 let mut writer = BufWriter::new(conn);
157 writer.write_all(msg.as_bytes())?;
158 Ok((sender.clone(), receiver))
159 }
160
161 pub fn query(
162 &mut self,
163 collection: &str,
164 bucket: &str,
165 terms: &str,
166 limit: Option<i32>,
167 offset: Option<&str>,
168 ) -> Result<Vec<String>, std::io::Error> {
169 let (sender, receiver) = self.write(format!(
170 "QUERY {} {} {} {} {}\r\n",
171 collection,
172 bucket,
173 format!("\"{}\"", terms),
174 limit
175 .and_then(|l| Some(format!("LIMIT({})", l)))
176 .unwrap_or("".to_string()),
177 offset
178 .and_then(|l| Some(format!("OFFSET({})", l)))
179 .unwrap_or("".to_string()),
180 ))?;
181 if let Ok(id) = receiver.recv() {
182 let mut search_ids = self
183 .search_ids
184 .lock()
185 .expect("Failed to acquire search_id lock");
186 search_ids.insert(
187 id.split(" ").collect::<Vec<&str>>()[1].trim().to_string(),
188 sender,
189 );
190 }
191 match receiver.recv() {
192 Ok(result) => Ok(result
193 .split(" ")
194 .map(|s| s.to_string())
195 .collect::<Vec<String>>()),
196 Err(_) => Err(std::io::Error::new(
197 std::io::ErrorKind::Other,
198 "Failed to receive from query",
199 )),
200 }
201 }
202
203 pub fn suggest(
205 &mut self,
206 collection: &str,
207 bucket: &str,
208 word: &str,
209 limit: Option<i32>,
210 ) -> Result<Vec<String>, std::io::Error> {
211 let (sender, receiver) = self.write(format!(
212 "SUGGEST {} {} {} {}\r\n",
213 collection,
214 bucket,
215 format!("\"{}\"", word),
216 limit
217 .and_then(|l| Some(format!("LIMIT({})", l)))
218 .unwrap_or("".to_string()),
219 ))?;
220 if let Ok(id) = receiver.recv() {
221 let mut search_ids = self
222 .search_ids
223 .lock()
224 .expect("Failed to acquire search_id lock");
225 search_ids.insert(
226 id.split(" ").collect::<Vec<&str>>()[1].trim().to_string(),
227 sender,
228 );
229 }
230 match receiver.recv() {
231 Ok(result) => Ok(result
232 .split(" ")
233 .map(|s| s.to_string())
234 .collect::<Vec<String>>()),
235 Err(_) => Err(std::io::Error::new(
236 std::io::ErrorKind::Other,
237 "Failed to receive from suggest",
238 )),
239 }
240 }
241
242 pub fn ping(&mut self) -> Result<Receiver<String>, std::io::Error> {
243 let (_, receiver) = self.write("PING\r\n".to_string())?;
244 Ok(receiver)
245 }
246
247 pub fn quit(&mut self) -> Result<Receiver<String>, std::io::Error> {
248 let (_, receiver) = self.write("QUIT\r\n".to_string())?;
249 Ok(receiver)
250 }
251
252 pub fn help(&mut self, manual: Option<&str>) -> Result<Receiver<String>, std::io::Error> {
253 let (_, receiver) = self.write(format!("HELP {}\r\n", manual.unwrap_or("")))?;
254 Ok(receiver)
255 }
256}
257
258mod test {
259 use super::*;
260 use std::time;
261 #[test]
262 fn test_search() {
263 let mut s = SearchChan::new("127.0.0.1", 1491, "haha").expect("Connection error");
264 let handle = s.read();
265 assert_eq!("CONNECTED <sonic-server v1.1.8>\r\n", s.connect().unwrap());
266 thread::sleep(time::Duration::from_secs(4));
267 let r1 = s
268 .query("helpdesk", "user:0dcde3a6", "gdpr", Some(50), None)
269 .unwrap();
270 let r2 = s.ping().unwrap();
271 let r3 = s.quit().unwrap();
272 assert_eq!("EVENT", r1[0]);
273 assert_eq!("PONG\r\n", r2.recv().unwrap());
274 assert_eq!("ENDED quit\r\n", r3.recv().unwrap());
275 handle.join().expect("Failed to wait process");
276 }
277}