1use log::warn;
2
3use std::net::{TcpListener, TcpStream, ToSocketAddrs};
4use std::sync::Arc;
5use std::panic;
6
7use thrift::protocol::{
8 TInputProtocol, TInputProtocolFactory, TOutputProtocol, TOutputProtocolFactory,
9};
10use thrift::transport::{TIoChannel, TReadTransportFactory, TTcpChannel, TWriteTransportFactory};
11
12use thrift::server::{
13 TProcessor
14};
15use thrift::TransportErrorKind;
16
17use crate::interfaces::{Uint64};
18use crate::interfaces::TApplySyncClient;
19
20extern "Rust" {
21 fn native_apply(receiver: u64, first_receiver: u64, action: u64);
22}
23
24pub struct IPCServer<PRC, RTF, IPF, WTF, OPF>
25where
26 PRC: TProcessor + Send + Sync + 'static,
27 RTF: TReadTransportFactory + 'static,
28 IPF: TInputProtocolFactory + 'static,
29 WTF: TWriteTransportFactory + 'static,
30 OPF: TOutputProtocolFactory + 'static,
31{
32 r_trans_factory: RTF,
33 i_proto_factory: IPF,
34 w_trans_factory: WTF,
35 o_proto_factory: OPF,
36 processor: Arc<PRC>,
37 pub cnn: Option<IncomingConnection<PRC>>,
38 }
40
41impl<PRC, RTF, IPF, WTF, OPF> IPCServer<PRC, RTF, IPF, WTF, OPF>
42where
43 PRC: TProcessor + Send + Sync + 'static,
44 RTF: TReadTransportFactory + 'static,
45 IPF: TInputProtocolFactory + 'static,
46 WTF: TWriteTransportFactory + 'static,
47 OPF: TOutputProtocolFactory + 'static,
48{
49 pub fn new(
57 read_transport_factory: RTF,
58 input_protocol_factory: IPF,
59 write_transport_factory: WTF,
60 output_protocol_factory: OPF,
61 processor: PRC,
62 ) -> IPCServer<PRC, RTF, IPF, WTF, OPF> {
63 IPCServer {
64 r_trans_factory: read_transport_factory,
65 i_proto_factory: input_protocol_factory,
66 w_trans_factory: write_transport_factory,
67 o_proto_factory: output_protocol_factory,
68 processor: Arc::new(processor),
69 cnn: None,
70 }
72 }
73
74 pub fn listen<A: ToSocketAddrs>(&mut self, listen_address: A) -> thrift::Result<()> {
83 let listener = TcpListener::bind(listen_address)?;
84 let stream = listener.accept();
85
86 match stream {
87 Ok((s, _addr)) => {
88 let (mut i_prot, mut o_prot) = self.new_protocols_for_connection(s)?;
89 let processor = self.processor.clone();
90 match handle_incoming_connection(processor, &mut i_prot, &mut o_prot) {
91 Ok(()) => {},
92 Err(err) => {
93 return Err(err)
94 }
95 }
96 }
97 Err(e) => {
98 warn!("failed to accept remote connection with error {:?}", e);
99 }
100 }
101
102 Ok(())
103 }
104
105 pub fn accept<A: ToSocketAddrs>(&mut self, listen_address: A) -> thrift::Result<()> {
106 let listener = TcpListener::bind(listen_address)?;
107 let stream = listener.accept();
108 match stream {
109 Ok((s, _addr)) => {
110 let (i_prot, o_prot) = self.new_protocols_for_connection(s)?;
111 let processor = self.processor.clone();
112 self.cnn = Some(IncomingConnection {
113 processor: processor,
114 i_prot: i_prot,
115 o_prot: o_prot,
116 end_loop: false,
117 });
118 Ok(())
119 }
120 Err(e) => {
121 warn!("failed to accept remote connection with error {:?}", e);
122 Err(
123 thrift::Error::Application(
124 thrift::ApplicationError::new(
125 thrift::ApplicationErrorKind::InternalError,
126 format!("{}", e)
127 )
128 )
129 )
130 }
131 }
132 }
133
134 pub fn handle_apply_request(&mut self) -> thrift::Result<()> {
135 let mut cnn = self.cnn.as_mut().unwrap();
136 handle_incoming_connection_ex(&mut cnn)
137 }
138
139 pub fn end_loop(&mut self) {
140 self.cnn.as_mut().unwrap().end_loop = true;
141 }
142
143 fn new_protocols_for_connection(
144 &mut self,
145 stream: TcpStream,
146 ) -> thrift::Result<(
147 Box<dyn TInputProtocol + Send>,
148 Box<dyn TOutputProtocol + Send>,
149 )> {
150 let channel = TTcpChannel::with_stream(stream);
152
153 let (r_chan, w_chan) = channel.split()?;
156
157 let r_tran = self.r_trans_factory.create(Box::new(r_chan));
159 let i_prot = self.i_proto_factory.create(r_tran);
160
161 let w_tran = self.w_trans_factory.create(Box::new(w_chan));
163 let o_prot = self.o_proto_factory.create(w_tran);
164
165 Ok((i_prot, o_prot))
166 }
167}
168
169pub struct IncomingConnection<PRC>
170where
171 PRC: TProcessor,
172{
173 pub processor: Arc<PRC>,
174 pub i_prot: Box<dyn TInputProtocol + Send>,
175 pub o_prot: Box<dyn TOutputProtocol + Send>,
176 pub end_loop: bool,
177}
178
179impl<PRC> IncomingConnection<PRC>
180where
181 PRC: TProcessor,
182{
183 pub fn end_loop(&mut self) {
184 self.end_loop = true;
185 }
186}
187
188fn handle_incoming_connection<PRC>(
189 processor: Arc<PRC>,
190 i_prot: &mut Box<dyn TInputProtocol + Send>,
191 o_prot: &mut Box<dyn TOutputProtocol + Send>,
192) -> thrift::Result<()>
193where
194 PRC: TProcessor,
195{
196 let i_prot = i_prot;
197 let o_prot = o_prot;
198 loop {
199 let ret = processor.process(&mut *i_prot, &mut *o_prot);
200 match ret {
201 Ok(()) => {}
202 Err(ref err) => {
203 match err {
204 thrift::Error::Transport(ref transport_err)
205 if transport_err.kind == TransportErrorKind::EndOfFile => {
206 }
207 other => {
208 warn!("processor completed with error: {:?}", other);
209 }
210 }
211 return ret;
212 }
213 }
214 }
216}
217
218
219fn handle_incoming_connection_ex<PRC>(cnn: &mut IncomingConnection<PRC>) -> thrift::Result<()>
220where
221 PRC: TProcessor,
222{
223 cnn.end_loop = false;
224 let i_prot = &mut cnn.i_prot;
225 let o_prot = &mut cnn.o_prot;
226 loop {
227 if END_APPLY.lock().unwrap().get_value() {
228 END_APPLY.lock().unwrap().set_value(false);
229 return Ok(())
230 }
231 let ret = cnn.processor.clone().process(&mut *i_prot, &mut *o_prot);
232 match ret {
233 Ok(()) => {}
234 Err(ref err) => {
235 match err {
236 thrift::Error::Transport(ref transport_err)
237 if transport_err.kind == TransportErrorKind::EndOfFile => {
238 }
239 other => {
240 warn!("processor completed with error: {:?}", other);
241 }
242 }
243 return ret;
244 }
245 }
246 }
248}
249
250use thrift::protocol::{
251 TBinaryInputProtocolFactory,
252 TBinaryOutputProtocolFactory,
253};
254
255
256use thrift::transport::{
257 TBufferedReadTransportFactory, TBufferedWriteTransportFactory,
258};
259
260use crate::interfaces::{ApplyRequestSyncHandler, ApplyRequestSyncProcessor};
261
262use lazy_static::lazy_static; use std::sync::{
265 Mutex,
266 MutexGuard
267};
268
269
270pub struct EndApply {
271 value: bool
272}
273
274impl EndApply {
275 pub fn set_value(&mut self, value: bool) {
276 self.value = value;
277 }
278
279 pub fn get_value(&mut self) -> bool {
280 return self.value;
281 }
282}
283
284lazy_static! {
285 pub static ref END_APPLY: Mutex<EndApply> = Mutex::new(EndApply{value: false});
286}
287
288lazy_static! {
289 static ref APPLY_REQUEST_SERVER: Mutex<ApplyRequestServer> = Mutex::new(ApplyRequestServer::new());
290}
291
292
293struct ApplyRequestHandler {
295}
296
297impl Default for ApplyRequestHandler {
298 fn default() -> ApplyRequestHandler {
299 ApplyRequestHandler {
300 }
301 }
302}
303
304impl ApplyRequestSyncHandler for ApplyRequestHandler {
305 fn handle_apply_request(&self, receiver: Uint64, first_receiver: Uint64, action: Uint64) -> thrift::Result<i32> {
306 let _receiver = receiver.into();
307 let _first_receiver = first_receiver.into();
308 let _action = action.into();
309
310 let result = panic::catch_unwind(|| {
311 unsafe {
312 native_apply(_receiver, _first_receiver, _action);
313 }
314 });
315 match result {
316 Ok(()) => {
317
318 }
319 Err(err) => {
320 println!("{:?}", err);
321 }
322 }
323 crate::get_vm_api_client().end_apply().unwrap();
324 Ok(1)
325 }
326
327 fn handle_apply_end(&self) -> thrift::Result<i32> {
328 END_APPLY.lock().unwrap().set_value(true);
329 Ok(1)
330 }
331}
332
333pub struct ApplyRequestServer {
334 server: IPCServer<ApplyRequestSyncProcessor<ApplyRequestHandler>, TBufferedReadTransportFactory, TBinaryInputProtocolFactory, TBufferedWriteTransportFactory, TBinaryOutputProtocolFactory>,
335}
336
337impl ApplyRequestServer {
338 pub fn new() -> Self {
339 let i_tran_fact = TBufferedReadTransportFactory::new();
342 let i_prot_fact = TBinaryInputProtocolFactory::new();
343
344 let o_tran_fact = TBufferedWriteTransportFactory::new();
345 let o_prot_fact = TBinaryOutputProtocolFactory::new();
346
347 let processor = ApplyRequestSyncProcessor::new(ApplyRequestHandler {
349 ..Default::default()
350 });
351
352 Self {
354 server: IPCServer::new(
355 i_tran_fact,
356 i_prot_fact,
357 o_tran_fact,
358 o_prot_fact,
359 processor,
360 )}
361 }
362}
363
364pub fn run_apply_request_server() -> thrift::Result<()> {
365 get_apply_request_server().server.handle_apply_request()
366}
367
368pub fn get_apply_request_server() -> MutexGuard<'static, ApplyRequestServer> {
369 let mut ret = APPLY_REQUEST_SERVER.lock().unwrap();
370 if ret.server.cnn.is_none() {
371 println!("apply_request server: waiting for debugger connection");
372 let host = crate::get_debugger_config().apply_request_server_address.clone();
373 let port = crate::get_debugger_config().apply_request_server_port;
374 let address = format!("{}:{}", host, port);
375 ret.server.accept(address).unwrap();
376 println!("apply_request server: debugger connected");
377 }
378 return ret;
379}