kumoko/client/
mod.rs

1//! Module for Client functionality. Enable the client feature to use it.
2
3use std::{io, time::Duration};
4use tokio::{net::{ToSocketAddrs, TcpStream}, sync::mpsc};
5use crate::{Message, instance, event::{Origin, Event}};
6
7pub use tokio::sync::mpsc::error::TryRecvError;
8
9#[derive(Debug)]
10/// A Client with a full duplex connection to a Server. Can be .into_split()
11/// into an Emitter and Collector for async operations.
12pub struct Client<Req: Message, Res: Message>{
13    collector: Collector<Res>,
14    emitter: Emitter<Req>,
15}
16
17impl<Req: Message, Res: Message> Client<Req, Res>{
18    /// Connects to the server with the default Config.
19    pub async fn connect<A: ToSocketAddrs>(ip: A) -> io::Result<Client<Req, Res>> {
20        Self::connect_with_config(ip, Config::default()).await
21    }
22
23    /// Connects to the server with a custom Config.
24    pub async fn connect_with_config<A: ToSocketAddrs>(ip: A, config: Config) -> io::Result<Client<Req, Res>> {
25        let stream = TcpStream::connect(ip).await?;
26        let (read, write) = stream.into_split();
27    
28        let (sx, rx) = mpsc::channel(config.emitter_buffer);
29        instance::Collector::spawn_on_task(read, sx, Origin::OnClient, config.timeout);
30        let collector = Collector{rx};
31    
32        let (sx, rx) = mpsc::channel(config.collector_buffer);
33        instance::Emitter::spawn_on_task(write, rx);
34        let emitter = Emitter{sx};
35        
36        Ok(Client{collector, emitter})
37    }
38
39    /// Gets the next event if one is available, otherwise it waits until it is.
40    /// 
41    /// Will return `None` once the connection has ended.
42    pub async fn get_event(&mut self) -> Option<Event<Res>> {
43        self.collector.get_event().await
44    }
45
46    /// Convenience method for applications which only care about responses.
47    /// 
48    /// Will return `None` once the connection has ended.
49    pub async fn get_response(&mut self) -> Option<Res> {
50        self.collector.get_response().await
51    }
52
53    pub fn try_get_event(&mut self) -> Result<Event<Res>, TryRecvError> {
54        self.collector.try_get_event()
55    }
56
57    pub fn try_get_response(&mut self) -> Result<Res, TryRecvError> {
58        self.collector.try_get_response()
59    }
60
61    /// Default method for streaming to the Server.
62    pub async fn emit_request(&self, req: Req) {
63        self.emitter.emit_request(req).await
64    }
65
66    pub fn try_emit(&self, req: Req) {
67        self.emitter.try_emit(req)
68    }
69
70    /// Splits the Client into Collector and Emitter. The Emitter can be 
71    /// cloned for async operations.
72    pub fn into_split(self) -> (Collector<Res>, Emitter<Req>) {
73        (self.collector, self.emitter)
74    }
75}
76
77#[derive(Debug)]
78pub struct Collector<Res: Message>{
79    rx: mpsc::Receiver<(Event<Res>, Origin)>
80}
81
82impl<Res: Message> Collector<Res> {
83    /// Gets the next event if one is available, otherwise it waits until it is.
84    /// 
85    /// Will return `None` once the connection has ended.
86    pub async fn get_event(&mut self) -> Option<Event<Res>> {
87        match self.rx.recv().await {
88            Some((msg, _)) => Some(msg),
89            None => None,
90        }
91    }
92
93    /// Convenience method for applications which only care about responses.
94    /// 
95    /// Will return `None` once the connection has ended.
96    pub async fn get_response(&mut self) -> Option<Res> {
97        loop{
98            match self.get_event().await{
99                Some(Event::Message(res)) => return Some(res),
100                Some(_) => continue,
101                None => return None,
102            }
103        }
104    }
105
106    pub fn try_get_event(&mut self) -> Result<Event<Res>, TryRecvError> {
107        match self.rx.try_recv() {
108            Ok((msg, _)) => Ok(msg),
109            Err(e) => Err(e),
110        }
111    }
112
113    pub fn try_get_response(&mut self) -> Result<Res, TryRecvError> {
114        loop{
115            match self.try_get_event(){
116                Ok(Event::Message(res)) => return Ok(res),
117                Ok(_) => continue,
118                Err(e) => return Err(e),
119            }
120        }
121    }
122}
123
124#[derive(Debug, Clone)]
125pub struct Emitter<Req: Message>{
126    sx: mpsc::Sender<Req>
127}
128
129impl<Req: Message> Emitter<Req> {
130    /// Default method for streaming to the Server.
131    pub async fn emit_request(&self, req: Req) {
132        match self.sx.send(req).await {
133            Ok(_) => (),
134            Err(_) => unreachable!(),
135        }
136    }
137
138    pub fn try_emit(&self, req: Req) {
139        match self.sx.try_send(req){
140            Ok(_) => (),
141            Err(e) => panic!("{}", e),
142        }
143    }
144}
145
146/// Config for the Client
147pub struct Config{
148    /// If no new Responses appear within this duration, we drop the collector.
149    pub timeout: Duration,
150    /// The size of the channel buffer for the Emitter.
151    pub emitter_buffer: usize,
152    /// the size of the channel buffer for the Collector.
153    pub collector_buffer: usize,
154}
155
156impl Default for Config{
157    fn default() -> Config {
158        Config { timeout: Duration::MAX, emitter_buffer: 3, collector_buffer: 3 }
159    }
160}