1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
use super::*; use crate::flowgger::config::Config; use crate::flowgger::decoder::Decoder; use crate::flowgger::encoder::Encoder; #[cfg(feature = "capnp-recompile")] use crate::flowgger::splitter::CapnpSplitter; use crate::flowgger::splitter::{LineSplitter, NulSplitter, Splitter, SyslenSplitter}; use std::io::BufReader; use std::net::{TcpListener, TcpStream}; use std::sync::mpsc::SyncSender; use std::thread; use std::time::Duration; pub struct TcpInput { listen: String, tcp_config: TcpConfig, timeout: Option<Duration>, } impl TcpInput { pub fn new(config: &Config) -> TcpInput { let (tcp_config, listen, timeout) = config_parse(config); TcpInput { listen, tcp_config, timeout: Some(Duration::from_secs(timeout)), } } } impl Input for TcpInput { fn accept( &self, tx: SyncSender<Vec<u8>>, decoder: Box<dyn Decoder + Send>, encoder: Box<dyn Encoder + Send>, ) { let listener = TcpListener::bind(&self.listen as &str).unwrap(); for client in listener.incoming() { if let Ok(client) = client { let _ = client.set_read_timeout(self.timeout); let tx = tx.clone(); let tcp_config = self.tcp_config.clone(); let (decoder, encoder) = (decoder.clone_boxed(), encoder.clone_boxed()); thread::spawn(move || { handle_client(client, tx, decoder, encoder, tcp_config); }); } } } } #[cfg(feature = "capnp-recompile")] pub fn get_capnp_splitter<T>() -> Box<dyn Splitter<T>> where T: std::io::Read, { Box::new(CapnpSplitter) as Box<dyn Splitter<_>> } #[cfg(not(feature = "capnp-recompile"))] pub fn get_capnp_splitter() -> ! { panic!("Support for CapNProto is not compiled in") } fn handle_client( client: TcpStream, tx: SyncSender<Vec<u8>>, decoder: Box<dyn Decoder>, encoder: Box<dyn Encoder>, tcp_config: TcpConfig, ) { if let Ok(peer_addr) = client.peer_addr() { println!("Connection over TCP from [{}]", peer_addr); } let reader = BufReader::new(client); let splitter = match &tcp_config.framing as &str { "capnp" => get_capnp_splitter(), "line" => Box::new(LineSplitter) as Box<dyn Splitter<_>>, "syslen" => Box::new(SyslenSplitter) as Box<dyn Splitter<_>>, "nul" => Box::new(NulSplitter) as Box<dyn Splitter<_>>, _ => panic!("Unsupported framing scheme"), }; splitter.run(reader, tx, decoder, encoder); }