1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
use crate::errors::Error; use crate::packet::{IpcPacket, Packet}; use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender}; use ipc_channel::ipc::{self, IpcSender}; use ipc_channel::ipc::{IpcReceiverSet, IpcSelectionResult}; use log::*; use std::sync::Arc; #[derive(Debug)] pub struct Client { receiver: CrossbeamReceiver<Option<Vec<Arc<Packet>>>>, available: Vec<Arc<Packet>>, is_closed: bool, } fn process_selection_result( msg_tx: &CrossbeamSender<Option<Vec<Arc<Packet>>>>, result: IpcSelectionResult, ) -> bool { let mut closed = false; match result { IpcSelectionResult::MessageReceived(_id, message) => { let opt_packets = match message.to::<Option<Vec<Packet>>>() { Err(e) => { error!("Failed to convert message to packets: {:?}", e); None } Ok(opt_packets) => opt_packets.map(|packets| { let packets: Vec<_> = packets.into_iter().map(|p| Arc::new(p)).collect(); packets }), }; closed = opt_packets.is_none(); if let Err(e) = msg_tx.send(opt_packets) { error!("Failed to send message: {:?}", e); closed = true; } } IpcSelectionResult::ChannelClosed(_id) => { if let Err(e) = msg_tx.send(None) { error!("Failed to send message: {:?}", e); closed = true; } } } closed } impl Client { pub fn new(server_name: String) -> Result<Client, Error> { let (ipc_tx, ipc_rx) = ipc::channel::<IpcSender<Vec<IpcPacket>>>().map_err(Error::Io)?; let server_sender = IpcSender::connect(server_name).map_err(Error::Io)?; server_sender.send(ipc_tx).map_err(Error::Bincode)?; let mut receiver = IpcReceiverSet::new().map_err(Error::Io)?; receiver.add_opaque(ipc_rx.to_opaque()).map_err(Error::Io)?; let (msg_tx, msg_rx) = crossbeam_channel::unbounded(); std::thread::spawn(move || { let mut closed = false; while !closed { match receiver.select() { Err(e) => { error!("Failed to receive packets: {:?}", e); closed = true; } Ok(results) => { for result in results.into_iter() { closed = closed || process_selection_result(&msg_tx, result); } } } } }); Ok(Client { receiver: msg_rx, available: vec![], is_closed: false, }) } pub fn take(&mut self, size: usize) -> Vec<Arc<Packet>> { let packets_to_take = usize::min(size, self.available.len()); let mut rem = self.available.split_off(packets_to_take); std::mem::swap(&mut self.available, &mut rem); rem } pub fn recv(&mut self, size: usize) -> Result<Option<Vec<Arc<Packet>>>, Error> { if self.is_closed { if self.available.is_empty() { Ok(None) } else { Ok(Some(self.take(size))) } } else if self.available.len() >= size { Ok(Some(self.take(size))) } else { let opt_packets = self.receiver.recv().map_err(Error::Recv)?; if let Some(packets) = opt_packets { self.available.extend(packets); } else { self.is_closed = true; if self.available.is_empty() { return Ok(None); } } Ok(Some(self.take(size))) } } }