1#[allow(unused_imports)]
3use crate as ts3;
4use crate::request::{Request, RequestBuilder, ServerNotifyRegister, TextMessageTarget};
5use crate::response::Whoami;
6use crate::shared::list::Pipe;
7
8pub use async_trait::async_trait;
9
10use crate::shared::{ClientDatabaseId, List, ServerGroupId, ServerId};
11use crate::{
12 event::{EventHandler, Handler},
13 response::{ApiKey, Version},
14 shared::ApiKeyScope,
15 Decode, Error, ErrorKind,
16};
17use bytes::Bytes;
18use std::{
19 convert::From,
20 result,
21 sync::{Arc, RwLock},
22 time::Duration,
23};
24use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
25use tokio::{
26 net::{TcpStream, ToSocketAddrs},
27 sync::{mpsc, oneshot},
28 task::spawn,
29 time::sleep,
30};
31
32pub type Result<T> = result::Result<T, Error>;
33
34impl Error {
35 fn ok(&self) -> bool {
36 use ErrorKind::*;
37
38 match &self.0 {
39 TS3 { id, msg: _ } => *id == 0,
40 _ => false,
41 }
42 }
43}
44
45struct Cmd {
46 bytes: Bytes,
47 resp: oneshot::Sender<Result<Vec<u8>>>,
48}
49
50pub(crate) struct ClientInner {
51 pub(crate) handler: Arc<dyn EventHandler>,
52}
53
54impl ClientInner {
55 fn new() -> ClientInner {
56 ClientInner {
57 handler: Arc::new(Handler),
58 }
59 }
60}
61
62#[derive(Clone)]
64pub struct Client {
65 tx: mpsc::Sender<Cmd>,
66 pub(crate) inner: Arc<RwLock<ClientInner>>,
67}
68
69impl Client {
70 pub async fn connect<A: ToSocketAddrs>(addr: A) -> Result<Client> {
72 let (tx, mut rx) = mpsc::channel::<Cmd>(32);
73
74 let stream = TcpStream::connect(addr)
75 .await
76 .map_err(|e| Error(e.into()))?;
77
78 let (reader, mut writer) = stream.into_split();
79 let mut reader = BufReader::new(reader);
80
81 {
83 let mut buf = Vec::new();
84 reader
85 .read_until(b'\r', &mut buf)
86 .await
87 .map_err(|e| Error(e.into()))?;
88 buf.clear();
89 reader
90 .read_until(b'\r', &mut buf)
91 .await
92 .map_err(|e| Error(e.into()))?;
93 }
94
95 let (read_tx, mut read_rx) = mpsc::channel(32);
98
99 let client = Client {
101 tx,
102 inner: Arc::new(RwLock::new(ClientInner::new())),
104 };
105
106 let client2 = client.clone();
108 spawn(async move {
109 loop {
110 let client = client2.clone();
111
112 let mut buf = Vec::new();
114 if let Err(err) = reader.read_until(b'\r', &mut buf).await {
115 client.handle_error(Error(err.into()));
116 continue;
117 }
118
119 buf.truncate(buf.len() - 2);
121
122 if client.dispatch_event(&buf) {
125 continue;
126 }
127
128 match buf.starts_with(b"error") {
131 true => match Error::decode(&buf) {
132 Ok(err) => {
133 let _ = read_tx.send((Vec::new(), err)).await;
134 }
135 Err(err) => {
136 client.handle_error(err);
137 }
138 },
139 false => {
140 let resp = buf.clone();
142
143 buf.clear();
145 if let Err(err) = reader.read_until(b'\r', &mut buf).await {
146 client.handle_error(Error(err.into()));
147 continue;
148 }
149
150 match Error::decode(&buf) {
151 Ok(err) => {
152 let _ = read_tx.send((resp, err)).await;
153 }
154 Err(err) => {
155 client.handle_error(err);
156 }
157 }
158 }
159 }
160 }
161 });
162
163 spawn(async move {
165 while let Some(cmd) = rx.recv().await {
166 if let Err(err) = writer.write_all(&cmd.bytes).await {
168 let _ = cmd.resp.send(Err(Error(err.into())));
169 continue;
170 }
171
172 if let Err(err) = writer.write_all(&[b'\n']).await {
174 let _ = cmd.resp.send(Err(Error(err.into())));
175 continue;
176 }
177
178 let (resp, err) = read_rx.recv().await.unwrap();
180
181 let _ = cmd.resp.send(match err.ok() {
184 true => Ok(resp),
185 false => Err(err),
186 });
187 }
188 });
189
190 let tx2 = client.tx.clone();
192 spawn(async move {
193 loop {
194 let tx = tx2.clone();
195 sleep(Duration::from_secs(60)).await;
196 {
197 let (resp_tx, _) = oneshot::channel();
198 if let Err(_) = tx
199 .send(Cmd {
200 bytes: Bytes::from_static("version".as_bytes()),
201 resp: resp_tx,
202 })
203 .await
204 {}
205 }
206 }
207 });
208
209 Ok(client)
210 }
211
212 pub fn set_event_handler<H: EventHandler + 'static>(&self, handler: H) {
213 let mut data = self.inner.write().unwrap();
214 data.handler = Arc::new(handler);
215 }
216
217 pub async fn send<T, R>(&self, request: R) -> Result<T>
219 where
220 T: Decode,
221 T::Error: Into<Error>,
222 R: Into<Request>,
223 {
224 self.send_inner(request.into()).await
225 }
226
227 async fn send_inner<T>(&self, request: Request) -> Result<T>
228 where
229 T: Decode,
230 T::Error: Into<Error>,
231 {
232 let tx = self.tx.clone();
233
234 let (resp_tx, resp_rx) = oneshot::channel();
236
237 match tx
238 .send(Cmd {
239 bytes: Bytes::from(request.buf.into_bytes()),
240 resp: resp_tx,
241 })
242 .await
243 {
244 Ok(_) => {
245 let resp = resp_rx.await.unwrap()?;
246 let val = T::decode(&resp).map_err(|e| e.into())?;
247 Ok(val)
248 }
249 Err(_) => Err(Error(ErrorKind::SendError)),
250 }
251 }
252
253 pub(crate) fn handle_error<E>(&self, error: E)
254 where
255 E: Into<Error>,
256 {
257 let inner = self.inner.read().unwrap();
258 inner.handler.error(self.clone(), error.into());
259 }
260}
261
262impl Client {
264 pub async fn apikeyadd(
268 &self,
269 scope: ApiKeyScope,
270 lifetime: Option<u64>,
271 cldbid: Option<ClientDatabaseId>,
272 ) -> Result<ApiKey> {
273 let mut req = RequestBuilder::new("apikeyadd").arg("scope", scope);
274 if let Some(lifetime) = lifetime {
275 req = req.arg("lifetime", lifetime);
276 }
277 if let Some(cldbid) = cldbid {
278 req = req.arg("cldbid", cldbid);
279 }
280
281 self.send(req.build()).await
282 }
283
284 pub async fn apikeydel(&self, id: u64) -> Result<()> {
287 let req = RequestBuilder::new("apikeydel").arg("id", id);
288 self.send(req.build()).await
289 }
290
291 pub async fn apikeylist(
294 &self,
295 cldbid: Option<(ClientDatabaseId, bool)>,
296 start: Option<u64>,
297 duration: Option<u64>,
298 count: bool,
299 ) -> Result<List<ApiKey, Pipe>> {
300 let mut req = RequestBuilder::new("apikeylist");
301 if let Some((cldbid, all)) = cldbid {
302 if all {
303 req = req.arg("cldbid", "*");
304 } else {
305 req = req.arg("cldbid", cldbid);
306 }
307 }
308 if let Some(start) = start {
309 req = req.arg("start", start);
310 }
311 if let Some(duration) = duration {
312 req = req.arg("duration", duration);
313 }
314
315 if count {
316 req = req.flag("-count");
317 }
318
319 self.send(req).await
320 }
321
322 pub async fn banadd(
325 &self,
326 ip: Option<&str>,
327 name: Option<&str>,
328 uid: Option<&str>,
329 mytsid: Option<&str>,
330 time: Option<u64>,
331 banreason: Option<&str>,
332 lastnickname: Option<&str>,
333 ) -> Result<()> {
334 let mut req = RequestBuilder::new("banadd");
335
336 if let Some(ip) = ip {
337 req = req.arg("ip", ip);
338 }
339 if let Some(name) = name {
340 req = req.arg("name", name);
341 }
342 if let Some(uid) = uid {
343 req = req.arg("uid", uid);
344 }
345 if let Some(mytsid) = mytsid {
346 req = req.arg("mytsid", mytsid);
347 }
348 if let Some(time) = time {
349 req = req.arg("time", time);
350 }
351 if let Some(banreason) = banreason {
352 req = req.arg("banreason", banreason);
353 }
354 if let Some(lastnickname) = lastnickname {
355 req = req.arg("lastnickname", lastnickname);
356 }
357
358 self.send(req).await
359 }
360
361 pub async fn gm(&self, msg: &str) -> Result<()> {
364 let req = RequestBuilder::new("gm").arg("msg", msg);
365 self.send(req).await
366 }
367
368 pub async fn login(&self, username: &str, password: &str) -> Result<()> {
370 let req = RequestBuilder::new("login")
371 .arg("client_login_name", username)
372 .arg("client_login_password", password);
373 self.send(req).await
374 }
375
376 pub async fn logout(&self) -> Result<()> {
378 let req = RequestBuilder::new("logout");
379 self.send(req).await
380 }
381
382 pub async fn quit(&self) -> Result<()> {
384 let req = RequestBuilder::new("quit");
385 self.send(req).await
386 }
387
388 pub async fn sendtextmessage(&self, target: TextMessageTarget, msg: &str) -> Result<()> {
389 let req = RequestBuilder::new("sendtextmessage")
390 .arg("targetmode", target)
391 .arg("msg", msg);
392 self.send(req).await
393 }
394
395 pub async fn servergroupaddclient(
398 &self,
399 sgid: ServerGroupId,
400 cldbid: ClientDatabaseId,
401 ) -> Result<()> {
402 let req = RequestBuilder::new("servergroupaddclient")
403 .arg("sgid", sgid)
404 .arg("cldbid", cldbid);
405 self.send(req).await
406 }
407
408 pub async fn servergroupdelclient(
411 &self,
412 sgid: ServerGroupId,
413 cldbid: ClientDatabaseId,
414 ) -> Result<()> {
415 let req = RequestBuilder::new("servergroupdelclient")
416 .arg("sgid", sgid)
417 .arg("cldbid", cldbid);
418 self.send(req).await
419 }
420
421 pub async fn servernotifyregister(&self, event: ServerNotifyRegister) -> Result<()> {
429 let req = RequestBuilder::new("servernotifyregister").arg("event", event);
430 self.send(req).await
431 }
432
433 pub async fn serverstart<T>(&self, sid: T) -> Result<()>
437 where
438 T: Into<ServerId>,
439 {
440 let req = RequestBuilder::new("serverstart").arg("sid", sid.into());
441 self.send(req).await
442 }
443
444 pub async fn serverstop<T>(&self, sid: T) -> Result<()>
449 where
450 T: Into<ServerId>,
451 {
452 let req = RequestBuilder::new("serverstop").arg("sid", sid.into());
453 self.send(req).await
454 }
455
456 pub async fn use_sid<T>(&self, sid: T) -> Result<()>
458 where
459 T: Into<ServerId>,
460 {
461 let req = RequestBuilder::new("use").arg("sid", sid.into());
462 self.send(req).await
463 }
464
465 pub async fn use_port(&self, port: u16) -> Result<()> {
467 let req = RequestBuilder::new("use").arg("port", port);
468 self.send(req).await
469 }
470
471 pub async fn version(&self) -> Result<Version> {
473 let req = RequestBuilder::new("version");
474 self.send(req).await
475 }
476
477 pub async fn whoami(&self) -> Result<Whoami> {
479 let req = RequestBuilder::new("whoami");
480 self.send(req).await
481 }
482}