packet_ipc/
client.rs

1use crate::errors::Error;
2
3use crate::packet::{IpcPacket, Packet};
4use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
5use ipc_channel::ipc::{self, IpcSender};
6use ipc_channel::ipc::{IpcReceiverSet, IpcSelectionResult};
7use log::*;
8use std::sync::Arc;
9
10#[derive(Debug)]
11pub struct Client {
12    receiver: CrossbeamReceiver<Option<Vec<Arc<Packet>>>>,
13    available: Vec<Arc<Packet>>,
14    is_closed: bool,
15}
16
17fn process_selection_result(
18    msg_tx: &CrossbeamSender<Option<Vec<Arc<Packet>>>>,
19    result: IpcSelectionResult,
20) -> bool {
21    let mut closed = false;
22    match result {
23        IpcSelectionResult::MessageReceived(_id, message) => {
24            let opt_packets = match message.to::<Option<Vec<Packet>>>() {
25                Err(e) => {
26                    error!("Failed to convert message to packets: {:?}", e);
27                    None
28                }
29                Ok(opt_packets) => opt_packets.map(|packets| {
30                    let packets: Vec<_> = packets.into_iter().map(|p| Arc::new(p)).collect();
31                    packets
32                }),
33            };
34            closed = opt_packets.is_none();
35            if let Err(e) = msg_tx.send(opt_packets) {
36                error!("Failed to send message: {:?}", e);
37                closed = true;
38            }
39        }
40        IpcSelectionResult::ChannelClosed(_id) => {
41            if let Err(e) = msg_tx.send(None) {
42                error!("Failed to send message: {:?}", e);
43                closed = true;
44            }
45        }
46    }
47    closed
48}
49
50impl Client {
51    /// Uses a unbounded channel to transfer data.
52    pub fn new(server_name: String) -> Result<Client, Error> {
53        Self::new_with_size(server_name, None)
54    }
55    /// new client with a choice of bounded or unbounded based on the channel_size bening Some(size) or None
56    pub fn new_with_size(
57        server_name: String,
58        channel_size: Option<usize>,
59    ) -> Result<Client, Error> {
60        let (ipc_tx, ipc_rx) = ipc::channel::<IpcSender<Vec<IpcPacket>>>().map_err(Error::Io)?;
61        let server_sender = IpcSender::connect(server_name).map_err(Error::Io)?;
62        server_sender.send(ipc_tx).map_err(Error::Bincode)?;
63
64        let mut receiver = IpcReceiverSet::new().map_err(Error::Io)?;
65        receiver.add_opaque(ipc_rx.to_opaque()).map_err(Error::Io)?;
66
67        let (msg_tx, msg_rx) = match channel_size {
68            Some(channel_size) => crossbeam_channel::bounded(channel_size),
69            None => crossbeam_channel::unbounded(),
70        };
71
72        std::thread::spawn(move || {
73            let mut closed = false;
74            while !closed {
75                match receiver.select() {
76                    Err(e) => {
77                        error!("Failed to receive packets: {:?}", e);
78                        closed = true;
79                    }
80                    Ok(results) => {
81                        for result in results.into_iter() {
82                            closed = closed || process_selection_result(&msg_tx, result);
83                        }
84                    }
85                }
86            }
87        });
88        Ok(Client {
89            receiver: msg_rx,
90            available: vec![],
91            is_closed: false,
92        })
93    }
94
95    pub fn take(&mut self, size: usize) -> Vec<Arc<Packet>> {
96        let packets_to_take = usize::min(size, self.available.len());
97        let mut rem = self.available.split_off(packets_to_take);
98        std::mem::swap(&mut self.available, &mut rem);
99        rem
100    }
101
102    pub fn recv(&mut self, size: usize) -> Result<Option<Vec<Arc<Packet>>>, Error> {
103        if self.is_closed {
104            if self.available.is_empty() {
105                Ok(None)
106            } else {
107                Ok(Some(self.take(size)))
108            }
109        } else if self.available.len() >= size {
110            Ok(Some(self.take(size)))
111        } else {
112            let opt_packets = self.receiver.recv().map_err(Error::Recv)?;
113            if let Some(packets) = opt_packets {
114                self.available.extend(packets);
115            } else {
116                self.is_closed = true;
117                if self.available.is_empty() {
118                    return Ok(None);
119                }
120            }
121            Ok(Some(self.take(size)))
122        }
123    }
124}