1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
use bincode::deserialize;
use buffett_metrics::counter::Counter;
use log::Level;
use crate::packet::{to_blobs, Packets, SharedPackets};
use rayon::prelude::*;
use crate::request::Request;
use request_processor::RequestProcessor;
use crate::result::{Error, Result};
use crate::service::Service;
use std::net::SocketAddr;
use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError};
use std::sync::Arc;
use std::thread::{self, Builder, JoinHandle};
use std::time::Instant;
use crate::streamer::{self, BlobReceiver, BlobSender};
use buffett_timing::timing;
use buffett_metrics::sub_new_counter_info;
pub struct RequestStage {
thread_hdl: JoinHandle<()>,
pub request_processor: Arc<RequestProcessor>,
}
impl RequestStage {
pub fn deserialize_requests(p: &Packets) -> Vec<Option<(Request, SocketAddr)>> {
p.packets
.par_iter()
.map(|x| {
deserialize(&x.data[0..x.meta.size])
.map(|req| (req, x.meta.addr()))
.ok()
}).collect()
}
pub fn process_request_packets(
request_processor: &RequestProcessor,
packet_receiver: &Receiver<SharedPackets>,
blob_sender: &BlobSender,
) -> Result<()> {
let (batch, batch_len, _recv_time) = streamer::recv_batch(packet_receiver)?;
debug!(
"@{:?} request_stage: processing: {}",
timing::timestamp(),
batch_len
);
let mut reqs_len = 0;
let proc_start = Instant::now();
for msgs in batch {
let reqs: Vec<_> = Self::deserialize_requests(&msgs.read().unwrap())
.into_iter()
.filter_map(|x| x)
.collect();
reqs_len += reqs.len();
let rsps = request_processor.process_requests(reqs);
let blobs = to_blobs(rsps)?;
if !blobs.is_empty() {
info!("process: sending blobs: {}", blobs.len());
blob_sender.send(blobs)?;
}
}
let total_time_s = timing::duration_in_seconds(&proc_start.elapsed());
let total_time_ms = timing::duration_in_milliseconds(&proc_start.elapsed());
sub_new_counter_info!("request_stage-time_ms", total_time_ms as usize);
debug!(
"@{:?} done process batches: {} time: {:?}ms reqs: {} reqs/s: {}",
timing::timestamp(),
batch_len,
total_time_ms,
reqs_len,
(reqs_len as f32) / (total_time_s)
);
Ok(())
}
pub fn new(
request_processor: RequestProcessor,
packet_receiver: Receiver<SharedPackets>,
) -> (Self, BlobReceiver) {
let request_processor = Arc::new(request_processor);
let request_processor_ = request_processor.clone();
let (blob_sender, blob_receiver) = channel();
let thread_hdl = Builder::new()
.name("bitconch-request-stage".to_string())
.spawn(move || loop {
if let Err(e) = Self::process_request_packets(
&request_processor_,
&packet_receiver,
&blob_sender,
) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => error!("{:?}", e),
}
}
}).unwrap();
(
RequestStage {
thread_hdl,
request_processor,
},
blob_receiver,
)
}
}
impl Service for RequestStage {
type JoinReturnType = ();
fn join(self) -> thread::Result<()> {
self.thread_hdl.join()
}
}