1use crate::errors::Error;
2
3use crate::packet::{IpcPacket, Packet};
4use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
5use ipc_channel::ipc::{self, IpcSender};
6use ipc_channel::ipc::{IpcReceiverSet, IpcSelectionResult};
7use log::*;
8use std::sync::Arc;
9
10#[derive(Debug)]
11pub struct Client {
12 receiver: CrossbeamReceiver<Option<Vec<Arc<Packet>>>>,
13 available: Vec<Arc<Packet>>,
14 is_closed: bool,
15}
16
17fn process_selection_result(
18 msg_tx: &CrossbeamSender<Option<Vec<Arc<Packet>>>>,
19 result: IpcSelectionResult,
20) -> bool {
21 let mut closed = false;
22 match result {
23 IpcSelectionResult::MessageReceived(_id, message) => {
24 let opt_packets = match message.to::<Option<Vec<Packet>>>() {
25 Err(e) => {
26 error!("Failed to convert message to packets: {:?}", e);
27 None
28 }
29 Ok(opt_packets) => opt_packets.map(|packets| {
30 let packets: Vec<_> = packets.into_iter().map(|p| Arc::new(p)).collect();
31 packets
32 }),
33 };
34 closed = opt_packets.is_none();
35 if let Err(e) = msg_tx.send(opt_packets) {
36 error!("Failed to send message: {:?}", e);
37 closed = true;
38 }
39 }
40 IpcSelectionResult::ChannelClosed(_id) => {
41 if let Err(e) = msg_tx.send(None) {
42 error!("Failed to send message: {:?}", e);
43 closed = true;
44 }
45 }
46 }
47 closed
48}
49
50impl Client {
51 pub fn new(server_name: String) -> Result<Client, Error> {
53 Self::new_with_size(server_name, None)
54 }
55 pub fn new_with_size(
57 server_name: String,
58 channel_size: Option<usize>,
59 ) -> Result<Client, Error> {
60 let (ipc_tx, ipc_rx) = ipc::channel::<IpcSender<Vec<IpcPacket>>>().map_err(Error::Io)?;
61 let server_sender = IpcSender::connect(server_name).map_err(Error::Io)?;
62 server_sender.send(ipc_tx).map_err(Error::Bincode)?;
63
64 let mut receiver = IpcReceiverSet::new().map_err(Error::Io)?;
65 receiver.add_opaque(ipc_rx.to_opaque()).map_err(Error::Io)?;
66
67 let (msg_tx, msg_rx) = match channel_size {
68 Some(channel_size) => crossbeam_channel::bounded(channel_size),
69 None => crossbeam_channel::unbounded(),
70 };
71
72 std::thread::spawn(move || {
73 let mut closed = false;
74 while !closed {
75 match receiver.select() {
76 Err(e) => {
77 error!("Failed to receive packets: {:?}", e);
78 closed = true;
79 }
80 Ok(results) => {
81 for result in results.into_iter() {
82 closed = closed || process_selection_result(&msg_tx, result);
83 }
84 }
85 }
86 }
87 });
88 Ok(Client {
89 receiver: msg_rx,
90 available: vec![],
91 is_closed: false,
92 })
93 }
94
95 pub fn take(&mut self, size: usize) -> Vec<Arc<Packet>> {
96 let packets_to_take = usize::min(size, self.available.len());
97 let mut rem = self.available.split_off(packets_to_take);
98 std::mem::swap(&mut self.available, &mut rem);
99 rem
100 }
101
102 pub fn recv(&mut self, size: usize) -> Result<Option<Vec<Arc<Packet>>>, Error> {
103 if self.is_closed {
104 if self.available.is_empty() {
105 Ok(None)
106 } else {
107 Ok(Some(self.take(size)))
108 }
109 } else if self.available.len() >= size {
110 Ok(Some(self.take(size)))
111 } else {
112 let opt_packets = self.receiver.recv().map_err(Error::Recv)?;
113 if let Some(packets) = opt_packets {
114 self.available.extend(packets);
115 } else {
116 self.is_closed = true;
117 if self.available.is_empty() {
118 return Ok(None);
119 }
120 }
121 Ok(Some(self.take(size)))
122 }
123 }
124}