1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
use bincode::serialize_into;
use byteorder::{BigEndian, WriteBytesExt};
use private;
use serde::{Deserialize, Serialize};
use std::fs;
use std::sync::{Arc, MutexGuard};
use std::sync::atomic::{AtomicUsize, Ordering};
use deque::BackGuardInner;
use std::io::{BufWriter, Write};
use std::marker::PhantomData;
use std::path::{Path, PathBuf};
use flate2::Compression;
use flate2::write::DeflateEncoder;
use deque;
const PAYLOAD_LEN_BYTES: usize = ::std::mem::size_of::<u32>();
#[derive(Debug)]
/// The 'send' side of hopper, similar to `std::sync::mpsc::Sender`.
pub struct Sender<T> {
name: String,
root: PathBuf, // directory we store our queues in
max_disk_bytes: usize,
mem_buffer: private::Queue<T>,
resource_type: PhantomData<T>,
disk_files_capacity: Arc<AtomicUsize>,
}
#[derive(Default, Debug)]
pub struct SenderSync {
pub sender_fp: Option<BufWriter<fs::File>>,
pub bytes_written: usize,
pub sender_seq_num: usize,
pub total_disk_writes: usize,
pub path: PathBuf, // active fp filename
}
impl<'de, T> Clone for Sender<T>
where
T: Serialize + Deserialize<'de>,
{
fn clone(&self) -> Sender<T> {
Sender {
name: self.name.clone(),
root: self.root.clone(),
max_disk_bytes: self.max_disk_bytes,
mem_buffer: self.mem_buffer.clone(),
resource_type: self.resource_type,
disk_files_capacity: Arc::clone(&self.disk_files_capacity),
}
}
}
impl<T> Sender<T>
where
T: Serialize,
{
#[doc(hidden)]
pub fn new<S>(
name: S,
data_dir: &Path,
max_disk_bytes: usize,
mem_buffer: private::Queue<T>,
max_disk_files: Arc<AtomicUsize>,
) -> Result<Sender<T>, super::Error>
where
S: Into<String>,
{
let setup_mem_buffer = mem_buffer.clone(); // clone is cheeeeeap
let mut guard = setup_mem_buffer.lock_back();
if !data_dir.is_dir() {
return Err(super::Error::NoSuchDirectory);
}
match private::read_seq_num(data_dir) {
Ok(seq_num) => {
let log = data_dir.join(format!("{}", seq_num));
match fs::OpenOptions::new().append(true).create(true).open(&log) {
Ok(fp) => {
(*guard).inner.sender_fp = Some(BufWriter::new(fp));
(*guard).inner.sender_seq_num = seq_num;
(*guard).inner.path = log;
Ok(Sender {
name: name.into(),
root: data_dir.to_path_buf(),
max_disk_bytes: max_disk_bytes,
mem_buffer: mem_buffer,
resource_type: PhantomData,
disk_files_capacity: max_disk_files,
})
}
Err(e) => Err(super::Error::IoError(e)),
}
}
Err(e) => Err(super::Error::IoError(e)),
}
}
fn write_to_disk(
&self,
event: T,
guard: &mut MutexGuard<BackGuardInner<SenderSync>>,
) -> Result<(), (T, super::Error)> {
let mut buf: Vec<u8> = Vec::with_capacity(64);
let mut e = DeflateEncoder::new(buf, Compression::fast());
serialize_into(&mut e, &event).expect("could not serialize");
buf = e.finish().unwrap();
let payload_len = buf.len();
// If the individual sender writes enough to go over the max we mark the
// file read-only--which will help the receiver to decide it has hit the
// end of its log file--and create a new log file.
let bytes_written = (*guard).inner.bytes_written + payload_len + PAYLOAD_LEN_BYTES;
if (bytes_written > self.max_disk_bytes) || (*guard).inner.sender_fp.is_none() {
// Once we've gone over the write limit for our current file or find
// that we've gotten behind the current queue file we need to seek
// forward to find our place in the space of queue files. We mark
// our current file read-only and then bump sender_seq_num to get up
// to date.
let _ = fs::metadata(&(*guard).inner.path).map(|p| {
let mut permissions = p.permissions();
permissions.set_readonly(true);
let _ = fs::set_permissions(&(*guard).inner.path, permissions);
});
(*guard).inner.sender_seq_num = (*guard).inner.sender_seq_num.wrapping_add(1);
(*guard).inner.path = self.root.join(format!("{}", (*guard).inner.sender_seq_num));
let disk_files_capacity = self.disk_files_capacity.load(Ordering::Acquire);
if disk_files_capacity == 0 {
return Err((event, super::Error::Full));
} else {
match fs::OpenOptions::new()
.append(true)
.create(true)
.open(&(*guard).inner.path)
{
Ok(fp) => {
self.disk_files_capacity.fetch_sub(1, Ordering::Release);
(*guard).inner.sender_fp = Some(BufWriter::new(fp));
(*guard).inner.bytes_written = 0;
}
Err(e) => {
return Err((event, super::Error::IoError(e)));
}
}
}
}
assert!((*guard).inner.sender_fp.is_some());
let mut bytes_written = 0;
if let Some(ref mut fp) = (*guard).inner.sender_fp {
match fp.write_u32::<BigEndian>(payload_len as u32) {
Ok(()) => bytes_written += PAYLOAD_LEN_BYTES,
Err(e) => {
return Err((event, super::Error::IoError(e)));
}
};
match fp.write(&buf[..]) {
Ok(written) => {
assert_eq!(payload_len, written);
bytes_written += written;
}
Err(e) => {
return Err((event, super::Error::IoError(e)));
}
}
}
(*guard).inner.bytes_written += bytes_written;
Ok(())
}
/// Attempt to flush any outstanding disk writes to the deque
///
/// This function will attempt to flush outstanding disk writes, which may
/// fail if the in-memory buffer is full. This function is useful when
/// traffic patterns are bursty, meaning a write may end up being stranded
/// in limbo for a good spell.
pub fn flush(&mut self) -> Result<(), super::Error> {
let mut back_guard = self.mem_buffer.lock_back();
if (*back_guard).inner.total_disk_writes != 0 {
// disk mode
assert!((*back_guard).inner.sender_fp.is_some());
if let Some(ref mut fp) = (*back_guard).inner.sender_fp {
fp.flush().expect("unable to flush");
} else {
unreachable!()
}
match self.mem_buffer.push_back(
private::Placement::Disk((*back_guard).inner.total_disk_writes),
&mut back_guard,
) {
Ok(must_wake_receiver) => {
(*back_guard).inner.total_disk_writes = 0;
if must_wake_receiver {
let front_guard = self.mem_buffer.lock_front();
self.mem_buffer.notify_not_empty(&front_guard);
drop(front_guard);
}
}
Err(_) => {
return Err(super::Error::NoFlush);
}
}
}
Ok(())
}
/// Send a event into the queue
///
/// This function will fail with IO errors if the underlying queue files are
/// temporarily exhausted -- say, due to lack of file descriptors -- of with
/// Full if there is no more space in the in-memory buffer _or_ on disk, as
/// per the `max_disk_files` setting from
/// `channel_with_explicit_capacity`. Ownership of the event will be
/// returned back to the caller on failure.
pub fn send(&mut self, event: T) -> Result<(), (T, super::Error)> {
// Welcome. Let me tell you about the time I fell off the toilet, hit my
// head and when I woke up I saw this! ~passes knapkin drawing of the
// flux capacitor over to you~
//
// This function pushes `event` into an in-memory deque unless that
// deque is full. Previous versions of hopper had a series of
// complicated flags and a monolock to coordinate with the Receiver to
// make sure that the `event` would eventually make it to disk or be
// stuffed into memory and order would be preserved throughout. This was
// goofy.
//
// What struck me, noodling about how to keep order, was that if I
// dropped all the flags and just used the data itself to signal where
// and in what order we needed to read we'd be in business. Every
// `event` gets wrapped in a `private::Placement` that indicates to the
// Receiver if it's in-memory -- in which case, it's right there -- or
// how many things are on disk needing to be read.
//
// Dang!
//
// We start off this function assuming that we can place the event into
// memory and, failing that, write the value to disk and flip into 'disk
// mode'. In disk mode every event is written to disk, then we attempt
// to write a disk placement into the deque. If that succeeeds we move
// back to memory-mode for the next write, in which we attempt to write
// to the deque first. In this was order is preserved, a couple of
// things go to disk and we're capped on memory. Or, more specifically:
//
// There are two sending modes: in-memory and to-disk. We detect that
// we're in to-disk mode by the value of `total_disk_writes`. If it's
// non-zero we default to writing to disk, then attempt a
// `placement::Disk(total_disk_writes)` push_back. If that is a success
// we're in in-memory mode. If that's a failure we're still in
// to-disk. Similar story for flipping from in-memory to to-disk.
let mut back_guard = self.mem_buffer.lock_back();
if (*back_guard).inner.total_disk_writes == 0 {
// in-memory mode
let placed_event = private::Placement::Memory(event);
match self.mem_buffer.push_back(placed_event, &mut back_guard) {
Ok(must_wake_receiver) => {
if must_wake_receiver {
let front_guard = self.mem_buffer.lock_front();
self.mem_buffer.notify_not_empty(&front_guard);
drop(front_guard);
}
}
Err(deque::Error::Full(placed_event)) => {
self.write_to_disk(placed_event.extract().unwrap(), &mut back_guard)?;
(*back_guard).inner.total_disk_writes += 1;
}
}
} else {
// disk mode
self.write_to_disk(event, &mut back_guard)?;
(*back_guard).inner.total_disk_writes += 1;
assert!((*back_guard).inner.sender_fp.is_some());
if let Some(ref mut fp) = (*back_guard).inner.sender_fp {
fp.flush().expect("unable to flush");
} else {
unreachable!()
}
if let Ok(must_wake_receiver) = self.mem_buffer.push_back(
private::Placement::Disk((*back_guard).inner.total_disk_writes),
&mut back_guard,
) {
(*back_guard).inner.total_disk_writes = 0;
if must_wake_receiver {
let front_guard = self.mem_buffer.lock_front();
self.mem_buffer.notify_not_empty(&front_guard);
drop(front_guard);
}
}
}
Ok(())
}
/// Return the sender's name
pub fn name(&self) -> &str {
&self.name
}
}