eosio_chaintester/
server.rs

1use log::warn;
2
3use std::net::{TcpListener, TcpStream, ToSocketAddrs};
4use std::sync::Arc;
5use std::panic;
6
7use thrift::protocol::{
8    TInputProtocol, TInputProtocolFactory, TOutputProtocol, TOutputProtocolFactory,
9};
10use thrift::transport::{TIoChannel, TReadTransportFactory, TTcpChannel, TWriteTransportFactory};
11
12use thrift::server::{
13    TProcessor
14};
15use thrift::TransportErrorKind;
16
17use crate::interfaces::{Uint64};
18use crate::interfaces::TApplySyncClient;
19
20extern "Rust" {
21	fn native_apply(receiver: u64, first_receiver: u64, action: u64);
22}
23
24pub struct IPCServer<PRC, RTF, IPF, WTF, OPF>
25where
26    PRC: TProcessor + Send + Sync + 'static,
27    RTF: TReadTransportFactory + 'static,
28    IPF: TInputProtocolFactory + 'static,
29    WTF: TWriteTransportFactory + 'static,
30    OPF: TOutputProtocolFactory + 'static,
31{
32    r_trans_factory: RTF,
33    i_proto_factory: IPF,
34    w_trans_factory: WTF,
35    o_proto_factory: OPF,
36    processor: Arc<PRC>,
37    pub cnn: Option<IncomingConnection<PRC>>,
38    // worker_pool: ThreadPool,
39}
40
41impl<PRC, RTF, IPF, WTF, OPF> IPCServer<PRC, RTF, IPF, WTF, OPF>
42where
43    PRC: TProcessor + Send + Sync + 'static,
44    RTF: TReadTransportFactory + 'static,
45    IPF: TInputProtocolFactory + 'static,
46    WTF: TWriteTransportFactory + 'static,
47    OPF: TOutputProtocolFactory + 'static,
48{
49    /// Create a `IPCServer`.
50    ///
51    /// Each accepted connection has an input and output half, each of which
52    /// requires a `TTransport` and `TProtocol`. `IPCServer` uses
53    /// `read_transport_factory` and `input_protocol_factory` to create
54    /// implementations for the input, and `write_transport_factory` and
55    /// `output_protocol_factory` to create implementations for the output.
56    pub fn new(
57        read_transport_factory: RTF,
58        input_protocol_factory: IPF,
59        write_transport_factory: WTF,
60        output_protocol_factory: OPF,
61        processor: PRC,
62    ) -> IPCServer<PRC, RTF, IPF, WTF, OPF> {
63        IPCServer {
64            r_trans_factory: read_transport_factory,
65            i_proto_factory: input_protocol_factory,
66            w_trans_factory: write_transport_factory,
67            o_proto_factory: output_protocol_factory,
68            processor: Arc::new(processor),
69            cnn: None,
70            // worker_pool: ThreadPool::with_name("Thrift service processor".to_owned(), num_workers),
71        }
72    }
73
74    /// Listen for incoming connections on `listen_address`.
75    ///
76    /// `listen_address` should implement `ToSocketAddrs` trait.
77    ///
78    /// Return `()` if successful.
79    ///
80    /// Return `Err` when the server cannot bind to `listen_address` or there
81    /// is an unrecoverable error.
82    pub fn listen<A: ToSocketAddrs>(&mut self, listen_address: A) -> thrift::Result<()> {
83        let listener = TcpListener::bind(listen_address)?;
84        let stream = listener.accept();
85
86        match stream {
87            Ok((s, _addr)) => {
88                let (mut i_prot, mut o_prot) = self.new_protocols_for_connection(s)?;
89                let processor = self.processor.clone();
90                match handle_incoming_connection(processor, &mut i_prot, &mut o_prot) {
91                    Ok(()) => {},
92                    Err(err) => {
93                        return Err(err)
94                    }
95                }
96            }
97            Err(e) => {
98                warn!("failed to accept remote connection with error {:?}", e);
99            }
100        }
101
102        Ok(())
103    }
104
105    pub fn accept<A: ToSocketAddrs>(&mut self, listen_address: A) -> thrift::Result<()> {
106        let listener = TcpListener::bind(listen_address)?;
107        let stream = listener.accept();
108        match stream {
109            Ok((s, _addr)) => {
110                let (i_prot, o_prot) = self.new_protocols_for_connection(s)?;
111                let processor = self.processor.clone();
112                self.cnn = Some(IncomingConnection {
113                    processor: processor,
114                    i_prot: i_prot,
115                    o_prot: o_prot,
116                    end_loop: false,
117                });
118                Ok(())
119            }
120            Err(e) => {
121                warn!("failed to accept remote connection with error {:?}", e);
122                Err(
123                    thrift::Error::Application(
124                      thrift::ApplicationError::new(
125                        thrift::ApplicationErrorKind::InternalError,
126                        format!("{}", e)
127                      )
128                    )
129                )
130            }
131        }
132    }
133
134    pub fn handle_apply_request(&mut self) -> thrift::Result<()> {
135        let mut cnn = self.cnn.as_mut().unwrap();
136        handle_incoming_connection_ex(&mut cnn)
137    }
138
139    pub fn end_loop(&mut self) {
140        self.cnn.as_mut().unwrap().end_loop = true;
141    }
142
143    fn new_protocols_for_connection(
144        &mut self,
145        stream: TcpStream,
146    ) -> thrift::Result<(
147        Box<dyn TInputProtocol + Send>,
148        Box<dyn TOutputProtocol + Send>,
149    )> {
150        // create the shared tcp stream
151        let channel = TTcpChannel::with_stream(stream);
152
153        // split it into two - one to be owned by the
154        // input tran/proto and the other by the output
155        let (r_chan, w_chan) = channel.split()?;
156
157        // input protocol and transport
158        let r_tran = self.r_trans_factory.create(Box::new(r_chan));
159        let i_prot = self.i_proto_factory.create(r_tran);
160
161        // output protocol and transport
162        let w_tran = self.w_trans_factory.create(Box::new(w_chan));
163        let o_prot = self.o_proto_factory.create(w_tran);
164
165        Ok((i_prot, o_prot))
166    }
167}
168
169pub struct IncomingConnection<PRC>
170where
171    PRC: TProcessor,
172{
173    pub processor: Arc<PRC>,
174    pub i_prot: Box<dyn TInputProtocol + Send>,
175    pub o_prot: Box<dyn TOutputProtocol + Send>,
176    pub end_loop: bool,
177}
178
179impl<PRC> IncomingConnection<PRC>
180where
181    PRC: TProcessor,
182{
183    pub fn end_loop(&mut self) {
184        self.end_loop = true;
185    }
186}
187
188fn handle_incoming_connection<PRC>(
189    processor: Arc<PRC>,
190    i_prot: &mut Box<dyn TInputProtocol + Send>,
191    o_prot: &mut Box<dyn TOutputProtocol + Send>,
192) -> thrift::Result<()>
193where
194    PRC: TProcessor,
195{
196    let i_prot = i_prot;
197    let o_prot = o_prot;
198    loop {
199        let ret = processor.process(&mut *i_prot, &mut *o_prot);
200        match ret {
201            Ok(()) => {}
202            Err(ref err) => {
203                match err {
204                    thrift::Error::Transport(ref transport_err)
205                        if transport_err.kind == TransportErrorKind::EndOfFile => {
206                        }
207                    other => {
208                        warn!("processor completed with error: {:?}", other);
209                    }
210                }
211                return ret;
212            }
213        }
214        // return ret;
215    }
216}
217
218
219fn handle_incoming_connection_ex<PRC>(cnn: &mut IncomingConnection<PRC>) -> thrift::Result<()>
220where
221    PRC: TProcessor,
222{
223    cnn.end_loop = false;
224    let i_prot = &mut cnn.i_prot;
225    let o_prot = &mut cnn.o_prot;
226    loop {
227        if END_APPLY.lock().unwrap().get_value() {
228            END_APPLY.lock().unwrap().set_value(false);
229            return Ok(())
230        }
231        let ret = cnn.processor.clone().process(&mut *i_prot, &mut *o_prot);
232        match ret {
233            Ok(()) => {}
234            Err(ref err) => {
235                match err {
236                    thrift::Error::Transport(ref transport_err)
237                        if transport_err.kind == TransportErrorKind::EndOfFile => {
238                        }
239                    other => {
240                        warn!("processor completed with error: {:?}", other);
241                    }
242                }
243                return ret;
244            }
245        }
246        // return ret;
247    }
248}
249
250use thrift::protocol::{
251    TBinaryInputProtocolFactory,
252    TBinaryOutputProtocolFactory,
253};
254
255
256use thrift::transport::{
257    TBufferedReadTransportFactory, TBufferedWriteTransportFactory,
258};
259
260use crate::interfaces::{ApplyRequestSyncHandler, ApplyRequestSyncProcessor};
261
262use lazy_static::lazy_static; // 1.4.0
263
264use std::sync::{
265    Mutex,
266    MutexGuard
267};
268
269
270pub struct EndApply {
271    value: bool
272}
273
274impl EndApply {
275    pub fn set_value(&mut self, value: bool) {
276        self.value = value;
277    }
278
279    pub fn get_value(&mut self) -> bool {
280        return self.value;
281    }
282}
283
284lazy_static! {
285    pub static ref END_APPLY: Mutex<EndApply> = Mutex::new(EndApply{value: false});
286}
287
288lazy_static! {
289    static ref APPLY_REQUEST_SERVER: Mutex<ApplyRequestServer> = Mutex::new(ApplyRequestServer::new());
290}
291
292
293/// Handles incoming ChainTester service calls.
294struct ApplyRequestHandler {
295}
296
297impl Default for ApplyRequestHandler {
298    fn default() -> ApplyRequestHandler {
299        ApplyRequestHandler {
300        }
301    }
302}
303
304impl ApplyRequestSyncHandler for ApplyRequestHandler {
305    fn handle_apply_request(&self, receiver: Uint64, first_receiver: Uint64, action: Uint64) -> thrift::Result<i32> {
306        let _receiver = receiver.into();
307        let _first_receiver = first_receiver.into();
308        let _action = action.into();
309
310        let result = panic::catch_unwind(|| {
311            unsafe {
312                native_apply(_receiver, _first_receiver, _action);
313            }
314        });
315        match result {
316            Ok(()) => {
317
318            }
319            Err(err) => {
320                println!("{:?}", err);
321            }
322        }
323        crate::get_vm_api_client().end_apply().unwrap();
324        Ok(1)
325    }
326
327    fn handle_apply_end(&self) -> thrift::Result<i32> {
328        END_APPLY.lock().unwrap().set_value(true);
329        Ok(1)
330    }
331}
332
333pub struct ApplyRequestServer {
334    server: IPCServer<ApplyRequestSyncProcessor<ApplyRequestHandler>, TBufferedReadTransportFactory, TBinaryInputProtocolFactory, TBufferedWriteTransportFactory, TBinaryOutputProtocolFactory>,
335}
336
337impl ApplyRequestServer {
338    pub fn new() -> Self {
339        // println!("binding to {}", listen_address);
340    
341        let i_tran_fact = TBufferedReadTransportFactory::new();
342        let i_prot_fact = TBinaryInputProtocolFactory::new();
343    
344        let o_tran_fact = TBufferedWriteTransportFactory::new();
345        let o_prot_fact = TBinaryOutputProtocolFactory::new();
346    
347        // demux incoming messages
348        let processor = ApplyRequestSyncProcessor::new(ApplyRequestHandler {
349            ..Default::default()
350        });
351    
352        // create the server and start listening
353        Self {
354            server: IPCServer::new(
355                i_tran_fact,
356                i_prot_fact,
357                o_tran_fact,
358                o_prot_fact,
359                processor,
360        )}
361    }
362}
363
364pub fn run_apply_request_server()  -> thrift::Result<()> {
365    get_apply_request_server().server.handle_apply_request()
366}
367
368pub fn get_apply_request_server() -> MutexGuard<'static, ApplyRequestServer> {
369    let mut ret = APPLY_REQUEST_SERVER.lock().unwrap();
370    if ret.server.cnn.is_none() {
371        println!("apply_request server: waiting for debugger connection");
372        let host = crate::get_debugger_config().apply_request_server_address.clone();
373        let port = crate::get_debugger_config().apply_request_server_port;
374        let address = format!("{}:{}", host, port);
375        ret.server.accept(address).unwrap();
376        println!("apply_request server: debugger connected");
377    }
378    return ret;
379}