is_not_capnp/
client.rs

1use crate::{
2	service::{CapnpService, Service},
3	Error, Result,
4};
5use async_std::{
6	io::{ReadExt, WriteExt},
7	net::{TcpStream, ToSocketAddrs},
8};
9use capnp_rpc::{rpc_twoparty_capnp::Side, RpcSystem};
10
11pub struct Client<T, S>
12where
13	T: capnp::capability::FromServer<S> + Clone,
14	S: Clone,
15{
16	inner: Service<T, S>,
17	reader: Option<TcpStream>,
18	writer: Option<TcpStream>,
19}
20
21impl<T, S> Client<T, S>
22where
23	T: capnp::capability::FromServer<S> + Clone,
24	S: Clone,
25{
26	pub fn new(inner: Service<T, S>) -> Self {
27		Self {
28			inner,
29			reader: None,
30			writer: None,
31		}
32	}
33
34	pub async fn connect<A: ToSocketAddrs>(&mut self, addr: A) -> Result<T> {
35		// pub async fn connect<A: ToSocketAddrs>(&mut self, addr: A) -> Result<Service<T, S>> {
36		let stream = TcpStream::connect(addr)
37			.await
38			.expect("error getting stream");
39
40		let (mut reader, mut writer) = (stream.clone(), stream);
41
42		//BUG:
43		//PERFORMANCE: u8 shouldn't be parsed to string before be parsed to bytes
44		writer
45			.write_all(self.inner.id.to_string().as_bytes())
46			.await
47			.expect("");
48
49		writer.flush().await.expect("");
50
51		let mut buf = vec![0u8; 1024];
52
53		let size = reader
54			.read(&mut buf)
55			.await
56			.expect("error reading at Client::connect");
57
58		let s = String::from_utf8(buf[..size].to_vec()).unwrap();
59
60		if s.eq("ok") {
61			self.reader = Some(reader);
62			self.writer = Some(writer);
63			println!("oooooook");
64
65			let (rpc_system, t) = self.bootstrap();
66
67			async_std::task::Builder::new().local(rpc_system).expect("");
68
69			Ok(t)
70		} else {
71			Err(Error::ServerNotReady)
72		}
73	}
74
75	pub fn rpc_system(&self) -> RpcSystem<Side> {
76		assert!(self.reader.is_some());
77		assert!(self.writer.is_some());
78
79		self.inner.rpc_system(
80			self.reader.clone().expect(""),
81			self.writer.clone().expect(""),
82			Side::Client,
83		)
84	}
85
86	pub fn bootstrap(&mut self) -> (RpcSystem<Side>, T) {
87		let mut rpc_system = self.rpc_system();
88		let t = rpc_system.bootstrap(Side::Server);
89
90		(rpc_system, t)
91	}
92}