ts3/
client.rs

1// Required for ts3_derive macro.
2#[allow(unused_imports)]
3use crate as ts3;
4use crate::request::{Request, RequestBuilder, ServerNotifyRegister, TextMessageTarget};
5use crate::response::Whoami;
6use crate::shared::list::Pipe;
7
8pub use async_trait::async_trait;
9
10use crate::shared::{ClientDatabaseId, List, ServerGroupId, ServerId};
11use crate::{
12    event::{EventHandler, Handler},
13    response::{ApiKey, Version},
14    shared::ApiKeyScope,
15    Decode, Error, ErrorKind,
16};
17use bytes::Bytes;
18use std::{
19    convert::From,
20    result,
21    sync::{Arc, RwLock},
22    time::Duration,
23};
24use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
25use tokio::{
26    net::{TcpStream, ToSocketAddrs},
27    sync::{mpsc, oneshot},
28    task::spawn,
29    time::sleep,
30};
31
32pub type Result<T> = result::Result<T, Error>;
33
34impl Error {
35    fn ok(&self) -> bool {
36        use ErrorKind::*;
37
38        match &self.0 {
39            TS3 { id, msg: _ } => *id == 0,
40            _ => false,
41        }
42    }
43}
44
45struct Cmd {
46    bytes: Bytes,
47    resp: oneshot::Sender<Result<Vec<u8>>>,
48}
49
50pub(crate) struct ClientInner {
51    pub(crate) handler: Arc<dyn EventHandler>,
52}
53
54impl ClientInner {
55    fn new() -> ClientInner {
56        ClientInner {
57            handler: Arc::new(Handler),
58        }
59    }
60}
61
62/// A Client used to send commands to the serverquery interface.
63#[derive(Clone)]
64pub struct Client {
65    tx: mpsc::Sender<Cmd>,
66    pub(crate) inner: Arc<RwLock<ClientInner>>,
67}
68
69impl Client {
70    /// Create a new connection
71    pub async fn connect<A: ToSocketAddrs>(addr: A) -> Result<Client> {
72        let (tx, mut rx) = mpsc::channel::<Cmd>(32);
73
74        let stream = TcpStream::connect(addr)
75            .await
76            .map_err(|e| Error(e.into()))?;
77
78        let (reader, mut writer) = stream.into_split();
79        let mut reader = BufReader::new(reader);
80
81        // Read initial welcome message
82        {
83            let mut buf = Vec::new();
84            reader
85                .read_until(b'\r', &mut buf)
86                .await
87                .map_err(|e| Error(e.into()))?;
88            buf.clear();
89            reader
90                .read_until(b'\r', &mut buf)
91                .await
92                .map_err(|e| Error(e.into()))?;
93        }
94
95        // read_tx and read_rx are used to communicate between the read and the write
96        // thread
97        let (read_tx, mut read_rx) = mpsc::channel(32);
98
99        // Create a new inner client
100        let client = Client {
101            tx,
102            // handler: Arc::new(RwLock::new()),
103            inner: Arc::new(RwLock::new(ClientInner::new())),
104        };
105
106        // Read task
107        let client2 = client.clone();
108        spawn(async move {
109            loop {
110                let client = client2.clone();
111
112                // Read from the buffer until a '\r' indicating the end of a line
113                let mut buf = Vec::new();
114                if let Err(err) = reader.read_until(b'\r', &mut buf).await {
115                    client.handle_error(Error(err.into()));
116                    continue;
117                }
118
119                // Remove the last two bytes '\n' and '\r'
120                buf.truncate(buf.len() - 2);
121
122                // If the received data is an event dispatch it to the correct handler and wait for
123                // the next line.
124                if client.dispatch_event(&buf) {
125                    continue;
126                }
127
128                // Query commands return 2 lines, the first being the response data while the sencond
129                // contains the error code. Other commands only return an error.
130                match buf.starts_with(b"error") {
131                    true => match Error::decode(&buf) {
132                        Ok(err) => {
133                            let _ = read_tx.send((Vec::new(), err)).await;
134                        }
135                        Err(err) => {
136                            client.handle_error(err);
137                        }
138                    },
139                    false => {
140                        // Clone the current buffer, which contains the response data
141                        let resp = buf.clone();
142
143                        // Read next line for the error
144                        buf.clear();
145                        if let Err(err) = reader.read_until(b'\r', &mut buf).await {
146                            client.handle_error(Error(err.into()));
147                            continue;
148                        }
149
150                        match Error::decode(&buf) {
151                            Ok(err) => {
152                                let _ = read_tx.send((resp, err)).await;
153                            }
154                            Err(err) => {
155                                client.handle_error(err);
156                            }
157                        }
158                    }
159                }
160            }
161        });
162
163        // Write Task
164        spawn(async move {
165            while let Some(cmd) = rx.recv().await {
166                // Write the command string
167                if let Err(err) = writer.write_all(&cmd.bytes).await {
168                    let _ = cmd.resp.send(Err(Error(err.into())));
169                    continue;
170                }
171
172                // Write a '\n' to send the command
173                if let Err(err) = writer.write_all(&[b'\n']).await {
174                    let _ = cmd.resp.send(Err(Error(err.into())));
175                    continue;
176                }
177
178                // Wait for the response from the reader task
179                let (resp, err) = read_rx.recv().await.unwrap();
180
181                // Write the response to the channel sent with the request. resp is None when
182                // an error occured.
183                let _ = cmd.resp.send(match err.ok() {
184                    true => Ok(resp),
185                    false => Err(err),
186                });
187            }
188        });
189
190        // Keepalive loop
191        let tx2 = client.tx.clone();
192        spawn(async move {
193            loop {
194                let tx = tx2.clone();
195                sleep(Duration::from_secs(60)).await;
196                {
197                    let (resp_tx, _) = oneshot::channel();
198                    if let Err(_) = tx
199                        .send(Cmd {
200                            bytes: Bytes::from_static("version".as_bytes()),
201                            resp: resp_tx,
202                        })
203                        .await
204                    {}
205                }
206            }
207        });
208
209        Ok(client)
210    }
211
212    pub fn set_event_handler<H: EventHandler + 'static>(&self, handler: H) {
213        let mut data = self.inner.write().unwrap();
214        data.handler = Arc::new(handler);
215    }
216
217    /// Sends a [`Request`] to the server.
218    pub async fn send<T, R>(&self, request: R) -> Result<T>
219    where
220        T: Decode,
221        T::Error: Into<Error>,
222        R: Into<Request>,
223    {
224        self.send_inner(request.into()).await
225    }
226
227    async fn send_inner<T>(&self, request: Request) -> Result<T>
228    where
229        T: Decode,
230        T::Error: Into<Error>,
231    {
232        let tx = self.tx.clone();
233
234        // Create a new channel for receiving the response
235        let (resp_tx, resp_rx) = oneshot::channel();
236
237        match tx
238            .send(Cmd {
239                bytes: Bytes::from(request.buf.into_bytes()),
240                resp: resp_tx,
241            })
242            .await
243        {
244            Ok(_) => {
245                let resp = resp_rx.await.unwrap()?;
246                let val = T::decode(&resp).map_err(|e| e.into())?;
247                Ok(val)
248            }
249            Err(_) => Err(Error(ErrorKind::SendError)),
250        }
251    }
252
253    pub(crate) fn handle_error<E>(&self, error: E)
254    where
255        E: Into<Error>,
256    {
257        let inner = self.inner.read().unwrap();
258        inner.handler.error(self.clone(), error.into());
259    }
260}
261
262// TS3 Commands go here
263impl Client {
264    /// Creates a new apikey using the specified scope, for the invoking user. The default
265    /// lifetime of a token is 14 days, a zero lifetime means no expiration. It is possible
266    ///  to create apikeys for other users using `b_virtualserver_apikey_manage.`
267    pub async fn apikeyadd(
268        &self,
269        scope: ApiKeyScope,
270        lifetime: Option<u64>,
271        cldbid: Option<ClientDatabaseId>,
272    ) -> Result<ApiKey> {
273        let mut req = RequestBuilder::new("apikeyadd").arg("scope", scope);
274        if let Some(lifetime) = lifetime {
275            req = req.arg("lifetime", lifetime);
276        }
277        if let Some(cldbid) = cldbid {
278            req = req.arg("cldbid", cldbid);
279        }
280
281        self.send(req.build()).await
282    }
283
284    /// Delete an apikey. Any apikey owned by the current user can always be deleted. Deleting
285    /// apikeys from another user requires `b_virtualserver_apikey_manage`.
286    pub async fn apikeydel(&self, id: u64) -> Result<()> {
287        let req = RequestBuilder::new("apikeydel").arg("id", id);
288        self.send(req.build()).await
289    }
290
291    /// Lists all apikeys owned by the user, or of all users using `cldbid`=`(0, true).` Usage
292    /// of `cldbid`=... requires `b_virtualserver_apikey_manage`.
293    pub async fn apikeylist(
294        &self,
295        cldbid: Option<(ClientDatabaseId, bool)>,
296        start: Option<u64>,
297        duration: Option<u64>,
298        count: bool,
299    ) -> Result<List<ApiKey, Pipe>> {
300        let mut req = RequestBuilder::new("apikeylist");
301        if let Some((cldbid, all)) = cldbid {
302            if all {
303                req = req.arg("cldbid", "*");
304            } else {
305                req = req.arg("cldbid", cldbid);
306            }
307        }
308        if let Some(start) = start {
309            req = req.arg("start", start);
310        }
311        if let Some(duration) = duration {
312            req = req.arg("duration", duration);
313        }
314
315        if count {
316            req = req.flag("-count");
317        }
318
319        self.send(req).await
320    }
321
322    /// Add a new ban rule on the selected virtual server. One of `ip`, `name`, `uid`
323    /// and `mytsid` must not be `None`.
324    pub async fn banadd(
325        &self,
326        ip: Option<&str>,
327        name: Option<&str>,
328        uid: Option<&str>,
329        mytsid: Option<&str>,
330        time: Option<u64>,
331        banreason: Option<&str>,
332        lastnickname: Option<&str>,
333    ) -> Result<()> {
334        let mut req = RequestBuilder::new("banadd");
335
336        if let Some(ip) = ip {
337            req = req.arg("ip", ip);
338        }
339        if let Some(name) = name {
340            req = req.arg("name", name);
341        }
342        if let Some(uid) = uid {
343            req = req.arg("uid", uid);
344        }
345        if let Some(mytsid) = mytsid {
346            req = req.arg("mytsid", mytsid);
347        }
348        if let Some(time) = time {
349            req = req.arg("time", time);
350        }
351        if let Some(banreason) = banreason {
352            req = req.arg("banreason", banreason);
353        }
354        if let Some(lastnickname) = lastnickname {
355            req = req.arg("lastnickname", lastnickname);
356        }
357
358        self.send(req).await
359    }
360
361    /// Sends a text message to all clients on all virtual servers in the TeamSpeak 3
362    /// Server instance.
363    pub async fn gm(&self, msg: &str) -> Result<()> {
364        let req = RequestBuilder::new("gm").arg("msg", msg);
365        self.send(req).await
366    }
367
368    /// Authenticate with the given data.
369    pub async fn login(&self, username: &str, password: &str) -> Result<()> {
370        let req = RequestBuilder::new("login")
371            .arg("client_login_name", username)
372            .arg("client_login_password", password);
373        self.send(req).await
374    }
375
376    /// Deselects the active virtual server and logs out from the server instance.
377    pub async fn logout(&self) -> Result<()> {
378        let req = RequestBuilder::new("logout");
379        self.send(req).await
380    }
381
382    /// Send a quit command, disconnecting the client and closing the TCP connection
383    pub async fn quit(&self) -> Result<()> {
384        let req = RequestBuilder::new("quit");
385        self.send(req).await
386    }
387
388    pub async fn sendtextmessage(&self, target: TextMessageTarget, msg: &str) -> Result<()> {
389        let req = RequestBuilder::new("sendtextmessage")
390            .arg("targetmode", target)
391            .arg("msg", msg);
392        self.send(req).await
393    }
394
395    /// Adds one or more clients to the server group specified with sgid. Please note that a
396    /// client cannot be added to default groups or template groups.
397    pub async fn servergroupaddclient(
398        &self,
399        sgid: ServerGroupId,
400        cldbid: ClientDatabaseId,
401    ) -> Result<()> {
402        let req = RequestBuilder::new("servergroupaddclient")
403            .arg("sgid", sgid)
404            .arg("cldbid", cldbid);
405        self.send(req).await
406    }
407
408    /// Removes one or more clients specified with cldbid from the server group specified with
409    /// sgid.  
410    pub async fn servergroupdelclient(
411        &self,
412        sgid: ServerGroupId,
413        cldbid: ClientDatabaseId,
414    ) -> Result<()> {
415        let req = RequestBuilder::new("servergroupdelclient")
416            .arg("sgid", sgid)
417            .arg("cldbid", cldbid);
418        self.send(req).await
419    }
420
421    /// Registers for a specified category of events on a virtual server to receive
422    /// notification messages. Depending on the notifications you've registered for,
423    /// the server will send you a message on every event in the view of your
424    /// ServerQuery client (e.g. clients joining your channel, incoming text
425    /// messages, server configuration changes, etc). The event source is declared by
426    /// the event parameter while id can be used to limit the notifications to a
427    /// specific channel.  
428    pub async fn servernotifyregister(&self, event: ServerNotifyRegister) -> Result<()> {
429        let req = RequestBuilder::new("servernotifyregister").arg("event", event);
430        self.send(req).await
431    }
432
433    /// Starts the virtual server specified with sid. Depending on your permissions,
434    /// you're able to start either your own virtual server only or all virtual
435    /// servers in the server instance.  
436    pub async fn serverstart<T>(&self, sid: T) -> Result<()>
437    where
438        T: Into<ServerId>,
439    {
440        let req = RequestBuilder::new("serverstart").arg("sid", sid.into());
441        self.send(req).await
442    }
443
444    /// Stops the virtual server specified with sid. Depending on your permissions,
445    /// you're able to stop either your own virtual server only or all virtual
446    /// servers in the server instance. The reasonmsg parameter specifies a
447    /// text message that is sent to the clients before the client disconnects.
448    pub async fn serverstop<T>(&self, sid: T) -> Result<()>
449    where
450        T: Into<ServerId>,
451    {
452        let req = RequestBuilder::new("serverstop").arg("sid", sid.into());
453        self.send(req).await
454    }
455
456    /// Switch to the virtualserver (voice) with the given server id
457    pub async fn use_sid<T>(&self, sid: T) -> Result<()>
458    where
459        T: Into<ServerId>,
460    {
461        let req = RequestBuilder::new("use").arg("sid", sid.into());
462        self.send(req).await
463    }
464
465    /// Like `use_sid` but instead use_port uses the voice port to connect to the virtualserver
466    pub async fn use_port(&self, port: u16) -> Result<()> {
467        let req = RequestBuilder::new("use").arg("port", port);
468        self.send(req).await
469    }
470
471    /// Returns information about the server version
472    pub async fn version(&self) -> Result<Version> {
473        let req = RequestBuilder::new("version");
474        self.send(req).await
475    }
476
477    /// Returns information about the query client connected
478    pub async fn whoami(&self) -> Result<Whoami> {
479        let req = RequestBuilder::new("whoami");
480        self.send(req).await
481    }
482}