1use crate::{router::Router, service::Service, Result};
2use async_std::{
3 io::{ReadExt, WriteExt},
4 net::{TcpListener, TcpStream, ToSocketAddrs},
5 stream::StreamExt,
6};
7use capnp_rpc::rpc_twoparty_capnp::Side;
8
9#[derive(Default)]
10pub struct Server {
11 router: Router,
12}
13
14impl Server {
15 pub fn new() -> Self {
16 Self {
17 router: Default::default(),
18 }
19 }
20
21 pub fn add<T: capnp::capability::FromServer<S> + Clone + 'static, S: Clone + 'static>(
22 &mut self,
23 service: Service<T, S>,
24 ) -> &mut Self {
25 self.router.insert(service.id, Box::new(service));
26 self
27 }
28
29 pub async fn serve<T: ToSocketAddrs>(&self, addr: T) -> Result<()> {
30 let listener = TcpListener::bind(addr).await.expect("");
31
32 let mut incoming = listener.incoming();
33
34 while let Some(stream) = incoming.next().await {
35 let stream = stream.expect("");
36
37 handle(self, stream).await.expect("error handling service");
38 }
39
40 Ok(())
41 }
42}
43
44async fn handle(server: &Server, stream: TcpStream) -> Result<()> {
45 let (mut reader, mut writer) = (stream.clone(), stream);
46
47 let service_id = service_id(&mut reader).await?;
48
49 if let Some(service) = server.router.get(&service_id) {
50 server_ready(&mut writer).await;
51
52 let rpc_system = service.rpc_system(reader, writer, Side::Server);
53
54 println!("inside handler before spawning");
55
56 rpc_system.await.expect("error spawning");
57 } else {
58 todo!()
60 }
61
62 Ok(())
63}
64
65async fn server_ready(writer: &mut TcpStream) {
66 writer.write_all(b"ok").await.expect("");
67 writer.flush().await.expect("");
68}
69
70async fn service_id(reader: &mut TcpStream) -> Result<u8> {
71 let mut buf = vec![0u8; 1024];
72
73 let size = reader
74 .read(&mut buf)
75 .await
76 .expect("error by reading stream at handle::service_id");
77
78 let s = String::from_utf8(buf[..size].to_vec()).unwrap();
79
80 println!("STRING IN service_id {s}");
81
82 let id = s.parse::<u8>().unwrap();
83
84 Ok(id)
85}