1use crate::{
2 service::{CapnpService, Service},
3 Error, Result,
4};
5use async_std::{
6 io::{ReadExt, WriteExt},
7 net::{TcpStream, ToSocketAddrs},
8};
9use capnp_rpc::{rpc_twoparty_capnp::Side, RpcSystem};
10
11pub struct Client<T, S>
12where
13 T: capnp::capability::FromServer<S> + Clone,
14 S: Clone,
15{
16 inner: Service<T, S>,
17 reader: Option<TcpStream>,
18 writer: Option<TcpStream>,
19}
20
21impl<T, S> Client<T, S>
22where
23 T: capnp::capability::FromServer<S> + Clone,
24 S: Clone,
25{
26 pub fn new(inner: Service<T, S>) -> Self {
27 Self {
28 inner,
29 reader: None,
30 writer: None,
31 }
32 }
33
34 pub async fn connect<A: ToSocketAddrs>(&mut self, addr: A) -> Result<T> {
35 let stream = TcpStream::connect(addr)
37 .await
38 .expect("error getting stream");
39
40 let (mut reader, mut writer) = (stream.clone(), stream);
41
42 writer
45 .write_all(self.inner.id.to_string().as_bytes())
46 .await
47 .expect("");
48
49 writer.flush().await.expect("");
50
51 let mut buf = vec![0u8; 1024];
52
53 let size = reader
54 .read(&mut buf)
55 .await
56 .expect("error reading at Client::connect");
57
58 let s = String::from_utf8(buf[..size].to_vec()).unwrap();
59
60 if s.eq("ok") {
61 self.reader = Some(reader);
62 self.writer = Some(writer);
63 println!("oooooook");
64
65 let (rpc_system, t) = self.bootstrap();
66
67 async_std::task::Builder::new().local(rpc_system).expect("");
68
69 Ok(t)
70 } else {
71 Err(Error::ServerNotReady)
72 }
73 }
74
75 pub fn rpc_system(&self) -> RpcSystem<Side> {
76 assert!(self.reader.is_some());
77 assert!(self.writer.is_some());
78
79 self.inner.rpc_system(
80 self.reader.clone().expect(""),
81 self.writer.clone().expect(""),
82 Side::Client,
83 )
84 }
85
86 pub fn bootstrap(&mut self) -> (RpcSystem<Side>, T) {
87 let mut rpc_system = self.rpc_system();
88 let t = rpc_system.bootstrap(Side::Server);
89
90 (rpc_system, t)
91 }
92}