1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
use crate::errors::Error;

use crate::packet::{IpcPacket, Packet};
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use ipc_channel::ipc::{self, IpcSender};
use ipc_channel::ipc::{IpcReceiverSet, IpcSelectionResult};
use log::*;
use std::sync::Arc;

#[derive(Debug)]
pub struct Client {
    receiver: CrossbeamReceiver<Option<Vec<Arc<Packet>>>>,
    available: Vec<Arc<Packet>>,
    is_closed: bool,
}

fn process_selection_result(
    msg_tx: &CrossbeamSender<Option<Vec<Arc<Packet>>>>,
    result: IpcSelectionResult,
) -> bool {
    let mut closed = false;
    match result {
        IpcSelectionResult::MessageReceived(_id, message) => {
            let opt_packets = match message.to::<Option<Vec<Packet>>>() {
                Err(e) => {
                    error!("Failed to convert message to packets: {:?}", e);
                    None
                }
                Ok(opt_packets) => opt_packets.map(|packets| {
                    let packets: Vec<_> = packets.into_iter().map(|p| Arc::new(p)).collect();
                    packets
                }),
            };
            closed = opt_packets.is_none();
            if let Err(e) = msg_tx.send(opt_packets) {
                error!("Failed to send message: {:?}", e);
                closed = true;
            }
        }
        IpcSelectionResult::ChannelClosed(_id) => {
            if let Err(e) = msg_tx.send(None) {
                error!("Failed to send message: {:?}", e);
                closed = true;
            }
        }
    }
    closed
}

impl Client {
    /// Uses a unbounded channel to transfer data.
    pub fn new(server_name: String) -> Result<Client, Error> {
        Self::new_with_size(server_name, None)
    }
    /// new client with a choice of bounded or unbounded based on the channel_size bening Some(size) or None
    pub fn new_with_size(
        server_name: String,
        channel_size: Option<usize>,
    ) -> Result<Client, Error> {
        let (ipc_tx, ipc_rx) = ipc::channel::<IpcSender<Vec<IpcPacket>>>().map_err(Error::Io)?;
        let server_sender = IpcSender::connect(server_name).map_err(Error::Io)?;
        server_sender.send(ipc_tx).map_err(Error::Bincode)?;

        let mut receiver = IpcReceiverSet::new().map_err(Error::Io)?;
        receiver.add_opaque(ipc_rx.to_opaque()).map_err(Error::Io)?;

        let (msg_tx, msg_rx) = match channel_size {
            Some(channel_size) => crossbeam_channel::bounded(channel_size),
            None => crossbeam_channel::unbounded(),
        };

        std::thread::spawn(move || {
            let mut closed = false;
            while !closed {
                match receiver.select() {
                    Err(e) => {
                        error!("Failed to receive packets: {:?}", e);
                        closed = true;
                    }
                    Ok(results) => {
                        for result in results.into_iter() {
                            closed = closed || process_selection_result(&msg_tx, result);
                        }
                    }
                }
            }
        });
        Ok(Client {
            receiver: msg_rx,
            available: vec![],
            is_closed: false,
        })
    }

    pub fn take(&mut self, size: usize) -> Vec<Arc<Packet>> {
        let packets_to_take = usize::min(size, self.available.len());
        let mut rem = self.available.split_off(packets_to_take);
        std::mem::swap(&mut self.available, &mut rem);
        rem
    }

    pub fn recv(&mut self, size: usize) -> Result<Option<Vec<Arc<Packet>>>, Error> {
        if self.is_closed {
            if self.available.is_empty() {
                Ok(None)
            } else {
                Ok(Some(self.take(size)))
            }
        } else if self.available.len() >= size {
            Ok(Some(self.take(size)))
        } else {
            let opt_packets = self.receiver.recv().map_err(Error::Recv)?;
            if let Some(packets) = opt_packets {
                self.available.extend(packets);
            } else {
                self.is_closed = true;
                if self.available.is_empty() {
                    return Ok(None);
                }
            }
            Ok(Some(self.take(size)))
        }
    }
}