1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
#![allow(deprecated)]
use common::{ControlMessage, Interest};
use data::Sample;
use mio_extras::channel;
use mio_extras::channel::TrySendError;
use mpmc::Queue;
use std::hash::Hash;
use std::io;
use std::sync::Arc;
#[derive(Clone)]
pub struct Sender<T> {
batch_size: usize,
buffer: Option<Vec<Sample<T>>>,
control_tx: channel::SyncSender<ControlMessage<T>>,
data_tx: channel::SyncSender<Vec<Sample<T>>>,
rx_queue: Arc<Queue<Vec<Sample<T>>>>,
}
impl<T: Hash + Eq + Send + Clone> Sender<T> {
pub fn new(
rx_queue: Arc<Queue<Vec<Sample<T>>>>,
data_tx: channel::SyncSender<Vec<Sample<T>>>,
control_tx: channel::SyncSender<ControlMessage<T>>,
batch_size: usize,
) -> Sender<T> {
let buffer = Vec::with_capacity(batch_size);
Sender {
batch_size: batch_size,
buffer: Some(buffer),
data_tx: data_tx,
control_tx: control_tx,
rx_queue: rx_queue,
}
}
#[inline]
pub fn send(&mut self, sample: Sample<T>) -> Result<(), io::Error> {
let mut buffer = self.buffer.take().unwrap();
buffer.push(sample);
if buffer.len() >= self.batch_size {
match self.data_tx.try_send(buffer) {
Ok(_) => {
if let Some(b) = self.rx_queue.pop() {
self.buffer = Some(b);
} else {
self.buffer = Some(Vec::with_capacity(self.batch_size));
}
Ok(())
}
Err(e) => {
match e {
TrySendError::Io(e) => {
error!("io error: {}", e);
Err(e)
}
TrySendError::Full(buffer) |
TrySendError::Disconnected(buffer) => {
self.buffer = Some(buffer);
Ok(())
}
}
}
}
} else {
self.buffer = Some(buffer);
Ok(())
}
}
pub fn add_interest(&mut self, interest: Interest<T>) {
let _ = self.control_tx.send(ControlMessage::AddInterest(interest));
}
pub fn remove_interest(&mut self, interest: Interest<T>) {
let _ = self.control_tx.send(
ControlMessage::RemoveInterest(interest),
);
}
pub fn set_batch_size(&mut self, batch_size: usize) {
self.batch_size = batch_size;
}
#[inline]
pub fn try_send(&mut self, sample: Sample<T>) -> Result<(), (Sample<T>)> {
let mut buffer = self.buffer.take().unwrap();
if buffer.len() < self.batch_size - 1 {
buffer.push(sample);
self.buffer = Some(buffer);
Ok(())
} else {
Err(sample)
}
}
}