1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
use std::net::SocketAddr;
use std::sync::Arc;
use async_trait::async_trait;
use workflow_websocket::server::WebSocketHandler;
use crate::asynchronous::message::*;
use crate::asynchronous::error::RpcResponseError;
use tokio::sync::mpsc::*;
use workflow_log::*;
use workflow_websocket::server::{
    WebSocketServer, Result as WebSocketResult
};
use tungstenite::Message;
use borsh::BorshSerialize;


pub fn result<Resp>(resp:Resp) -> Result<Option<Vec<u8>>,RpcResponseError>
where Resp : BorshSerialize {
    let data = resp.try_to_vec().map_err(|_|RpcResponseError::RespSerialize)?;
    Ok(Some(data))
}

pub struct RpcContext {
    pub peer : SocketAddr,
}


#[async_trait]
pub trait RpcHandler<Ops> : Send + Sync + 'static
where
    Ops : Send + Sync + 'static
{
    async fn handle_request(self : Arc<Self>, op : Ops, data : &[u8]) -> Result<Vec<u8>, RpcResponseError>;
}

#[derive(Clone)]
pub struct RpcWebSocketHandler<Ops>
where
    Ops: Send + Sync + TryFrom<u32> + 'static
{
    rpc_handler : Arc<dyn RpcHandler<Ops>>
}

impl<Ops> RpcWebSocketHandler<Ops>
where
    Ops: Send + Sync + TryFrom<u32> + 'static
{
    pub fn new(rpc_handler : Arc<dyn RpcHandler<Ops>>) -> Self {
        Self {
            rpc_handler
        }
    }
}

#[async_trait]
impl<Ops> WebSocketHandler for RpcWebSocketHandler<Ops>
where 
    Ops: Send + Sync + TryFrom<u32> + 'static,
    <Ops as TryFrom<u32>>::Error: Sync + Send + 'static
{
    type Context = Arc<RpcContext>;

    async fn connect(self : &Arc<Self>, peer: SocketAddr) -> WebSocketResult<Self::Context> {
        let ctx = RpcContext { peer };
        Ok(Arc::new(ctx))
    }

    async fn handshake(self : &Arc<Self>, _ctx : &Self::Context, _msg : Message, _sink : &UnboundedSender<tungstenite::Message>) -> WebSocketResult<()> {
        Ok(())
    }

    async fn message(self : &Arc<Self>, _ctx : &Self::Context, msg : Message, sink : &UnboundedSender<tungstenite::Message>) -> WebSocketResult<()> {

        if !msg.is_binary() {
            return Ok(())
        }

        let data = &msg.into_data();
        let req : ReqMessage = data.try_into().expect("invalid message!");

        let op = Ops::try_from(req.op); 
        match op {
            Ok(op) => {
                let result = self.rpc_handler.clone().handle_request(op,req.data).await;
                match result {
                    Ok(data) => {
                        if let Ok(msg) = RespMessage::new(req.id, 0, &data).try_to_vec() {
                            match sink.send(msg.into()) {
                                Ok(_) => {},
                                Err(e) => { log_trace!("Sink error: {:?}", e); }
                            }
                        }
                    },
                    Err(err) => {
                        log_trace!("RPC server error: {:?}", err);
                        if let Ok(err_vec) = err.try_to_vec() {
                            if let Ok(msg) = RespMessage::new(req.id, 1, &err_vec).try_to_vec() {
                                match sink.send(msg.into()) {
                                    Ok(_) => {},
                                    Err(e) => { log_trace!("Sink error: {:?}", e); }
                                }
                            }
                        }
                    }
                }
            },
            Err(_) => {
                log_error!("invalid request opcode {}", req.op);                
            }
        }

        Ok(())
    }
}

pub struct RpcServer<Ops>
where 
    Ops : Send + Sync  + TryFrom<u32> + 'static,
    <Ops as TryFrom<u32>>::Error: Sync + Send + 'static
{
    ws_server : Arc<WebSocketServer<RpcWebSocketHandler<Ops>>>,
}

impl<Ops> RpcServer<Ops>
where 
    Ops : Send + Sync + TryFrom<u32> + 'static,
    <Ops as TryFrom<u32>>::Error: Sync + Send + 'static
{
    pub fn new(rpc_handler : Arc<dyn RpcHandler<Ops>>) -> Arc<RpcServer<Ops>> {
        let ws_handler = Arc::new(RpcWebSocketHandler::<Ops>::new(rpc_handler));
        let ws_server = WebSocketServer::new(ws_handler);
        Arc::new(RpcServer { ws_server })
    }

    pub async fn listen(self : &Arc<Self>, addr : &str) -> WebSocketResult<()> {
        Ok(self.ws_server.listen(addr).await?)
    }
}