1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
use bincode::deserialize;
use buffett_metrics::counter::Counter;
use log::Level;
use crate::packet::{to_blobs, Packets, SharedPackets};
use rayon::prelude::*;
use crate::request::Request;
use request_processor::RequestProcessor;
use crate::result::{Error, Result};
use crate::service::Service;
use std::net::SocketAddr;
use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError};
use std::sync::Arc;
use std::thread::{self, Builder, JoinHandle};
use std::time::Instant;
use crate::streamer::{self, BlobReceiver, BlobSender};
use buffett_timing::timing;
use buffett_metrics::sub_new_counter_info;

pub struct RequestStage {
    thread_hdl: JoinHandle<()>,
    pub request_processor: Arc<RequestProcessor>,
}

impl RequestStage {
    pub fn deserialize_requests(p: &Packets) -> Vec<Option<(Request, SocketAddr)>> {
        p.packets
            .par_iter()
            .map(|x| {
                deserialize(&x.data[0..x.meta.size])
                    .map(|req| (req, x.meta.addr()))
                    .ok()
            }).collect()
    }

    pub fn process_request_packets(
        request_processor: &RequestProcessor,
        packet_receiver: &Receiver<SharedPackets>,
        blob_sender: &BlobSender,
    ) -> Result<()> {
        let (batch, batch_len, _recv_time) = streamer::recv_batch(packet_receiver)?;

        debug!(
            "@{:?} request_stage: processing: {}",
            timing::timestamp(),
            batch_len
        );

        let mut reqs_len = 0;
        let proc_start = Instant::now();
        for msgs in batch {
            let reqs: Vec<_> = Self::deserialize_requests(&msgs.read().unwrap())
                .into_iter()
                .filter_map(|x| x)
                .collect();
            reqs_len += reqs.len();

            let rsps = request_processor.process_requests(reqs);

            let blobs = to_blobs(rsps)?;
            if !blobs.is_empty() {
                info!("process: sending blobs: {}", blobs.len());
                //don't wake up the other side if there is nothing
                blob_sender.send(blobs)?;
            }
        }
        let total_time_s = timing::duration_in_seconds(&proc_start.elapsed());
        let total_time_ms = timing::duration_in_milliseconds(&proc_start.elapsed());
        sub_new_counter_info!("request_stage-time_ms", total_time_ms as usize);
        debug!(
            "@{:?} done process batches: {} time: {:?}ms reqs: {} reqs/s: {}",
            timing::timestamp(),
            batch_len,
            total_time_ms,
            reqs_len,
            (reqs_len as f32) / (total_time_s)
        );
        Ok(())
    }
    pub fn new(
        request_processor: RequestProcessor,
        packet_receiver: Receiver<SharedPackets>,
    ) -> (Self, BlobReceiver) {
        let request_processor = Arc::new(request_processor);
        let request_processor_ = request_processor.clone();
        let (blob_sender, blob_receiver) = channel();
        let thread_hdl = Builder::new()
            .name("bitconch-request-stage".to_string())
            .spawn(move || loop {
                if let Err(e) = Self::process_request_packets(
                    &request_processor_,
                    &packet_receiver,
                    &blob_sender,
                ) {
                    match e {
                        Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
                        Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
                        _ => error!("{:?}", e),
                    }
                }
            }).unwrap();
        (
            RequestStage {
                thread_hdl,
                request_processor,
            },
            blob_receiver,
        )
    }
}

impl Service for RequestStage {
    type JoinReturnType = ();

    fn join(self) -> thread::Result<()> {
        self.thread_hdl.join()
    }
}