1use std::{io, time::Duration};
4use tokio::{net::{ToSocketAddrs, TcpStream}, sync::mpsc};
5use crate::{Message, instance, event::{Origin, Event}};
6
7pub use tokio::sync::mpsc::error::TryRecvError;
8
9#[derive(Debug)]
10pub struct Client<Req: Message, Res: Message>{
13 collector: Collector<Res>,
14 emitter: Emitter<Req>,
15}
16
17impl<Req: Message, Res: Message> Client<Req, Res>{
18 pub async fn connect<A: ToSocketAddrs>(ip: A) -> io::Result<Client<Req, Res>> {
20 Self::connect_with_config(ip, Config::default()).await
21 }
22
23 pub async fn connect_with_config<A: ToSocketAddrs>(ip: A, config: Config) -> io::Result<Client<Req, Res>> {
25 let stream = TcpStream::connect(ip).await?;
26 let (read, write) = stream.into_split();
27
28 let (sx, rx) = mpsc::channel(config.emitter_buffer);
29 instance::Collector::spawn_on_task(read, sx, Origin::OnClient, config.timeout);
30 let collector = Collector{rx};
31
32 let (sx, rx) = mpsc::channel(config.collector_buffer);
33 instance::Emitter::spawn_on_task(write, rx);
34 let emitter = Emitter{sx};
35
36 Ok(Client{collector, emitter})
37 }
38
39 pub async fn get_event(&mut self) -> Option<Event<Res>> {
43 self.collector.get_event().await
44 }
45
46 pub async fn get_response(&mut self) -> Option<Res> {
50 self.collector.get_response().await
51 }
52
53 pub fn try_get_event(&mut self) -> Result<Event<Res>, TryRecvError> {
54 self.collector.try_get_event()
55 }
56
57 pub fn try_get_response(&mut self) -> Result<Res, TryRecvError> {
58 self.collector.try_get_response()
59 }
60
61 pub async fn emit_request(&self, req: Req) {
63 self.emitter.emit_request(req).await
64 }
65
66 pub fn try_emit(&self, req: Req) {
67 self.emitter.try_emit(req)
68 }
69
70 pub fn into_split(self) -> (Collector<Res>, Emitter<Req>) {
73 (self.collector, self.emitter)
74 }
75}
76
77#[derive(Debug)]
78pub struct Collector<Res: Message>{
79 rx: mpsc::Receiver<(Event<Res>, Origin)>
80}
81
82impl<Res: Message> Collector<Res> {
83 pub async fn get_event(&mut self) -> Option<Event<Res>> {
87 match self.rx.recv().await {
88 Some((msg, _)) => Some(msg),
89 None => None,
90 }
91 }
92
93 pub async fn get_response(&mut self) -> Option<Res> {
97 loop{
98 match self.get_event().await{
99 Some(Event::Message(res)) => return Some(res),
100 Some(_) => continue,
101 None => return None,
102 }
103 }
104 }
105
106 pub fn try_get_event(&mut self) -> Result<Event<Res>, TryRecvError> {
107 match self.rx.try_recv() {
108 Ok((msg, _)) => Ok(msg),
109 Err(e) => Err(e),
110 }
111 }
112
113 pub fn try_get_response(&mut self) -> Result<Res, TryRecvError> {
114 loop{
115 match self.try_get_event(){
116 Ok(Event::Message(res)) => return Ok(res),
117 Ok(_) => continue,
118 Err(e) => return Err(e),
119 }
120 }
121 }
122}
123
124#[derive(Debug, Clone)]
125pub struct Emitter<Req: Message>{
126 sx: mpsc::Sender<Req>
127}
128
129impl<Req: Message> Emitter<Req> {
130 pub async fn emit_request(&self, req: Req) {
132 match self.sx.send(req).await {
133 Ok(_) => (),
134 Err(_) => unreachable!(),
135 }
136 }
137
138 pub fn try_emit(&self, req: Req) {
139 match self.sx.try_send(req){
140 Ok(_) => (),
141 Err(e) => panic!("{}", e),
142 }
143 }
144}
145
146pub struct Config{
148 pub timeout: Duration,
150 pub emitter_buffer: usize,
152 pub collector_buffer: usize,
154}
155
156impl Default for Config{
157 fn default() -> Config {
158 Config { timeout: Duration::MAX, emitter_buffer: 3, collector_buffer: 3 }
159 }
160}