1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
use std::fs::*;
use std::io::{self, Write};
use std::num::NonZeroU64;
use std::path::{Path, PathBuf};

use crate::error::TrySendError;
use crate::header::Header;
use crate::state::QueueState;
use crate::sync::{DeletionEvent, FileGuard};
use crate::version::check_queue_version;

use super::{segment_filename, HEADER_EOF};

/// The name of the sender lock in the queue folder.
pub(crate) fn send_lock_filename<P: AsRef<Path>>(base: P) -> PathBuf {
    base.as_ref().join("send.lock")
}

/// Tries to acquire the sender lock for a queue.
pub(crate) fn try_acquire_send_lock<P: AsRef<Path>>(base: P) -> io::Result<FileGuard> {
    FileGuard::try_lock(send_lock_filename(base.as_ref()))?.ok_or_else(|| {
        io::Error::new(
            io::ErrorKind::Other,
            format!(
                "queue `{}` sender side already in use",
                base.as_ref().to_string_lossy()
            ),
        )
    })
}

/// Acquire the sender lock for a queue, awaiting if locked.
pub(crate) async fn acquire_send_lock<P: AsRef<Path>>(base: P) -> io::Result<FileGuard> {
    FileGuard::lock(send_lock_filename(base.as_ref())).await
}

pub(crate) struct QueueSize {
    pub(crate) in_bytes: u64,
    pub(crate) in_segments: u64,
}

/// Non-recursively get the directory size of a given path.
pub(crate) fn get_queue_size<P: AsRef<Path>>(base: P) -> io::Result<QueueSize> {
    let mut in_bytes = 0;
    let mut in_segments = 0;

    for dir_entry in read_dir(base.as_ref())? {
        let dir_entry = dir_entry?;

        if let Some(extension) = dir_entry.path().extension() {
            if extension == "q" {
                in_bytes += dir_entry.metadata()?.len();
                in_segments += 1;
            }
        }
    }

    Ok(QueueSize {
        in_bytes,
        in_segments,
    })
}

/// A builder for the sender side of the queue. Use this if you want to have fine-grained control
/// over the configuration of the queue. Most defaults should be ok of most applications.
pub struct SenderBuilder {
    /// The segment size in bytes that will trigger a new segment to be created. Segments an be
    /// bigger than this to accommodate the last element, but nothing beyond that (each segment
    /// must store at least one element).
    ///
    /// Default value: 4MB
    segment_size: NonZeroU64,

    /// The queue size that will block the sender from creating a new segment (until the receiver
    /// catches up, deleting old segments). The queue can get bigger than that, but only to
    /// accommodate the last segment (the queue must have at least one segment). Set this to `None`
    /// to create an unbounded queue.
    ///
    /// This value will be ignored if the queue has only one segment, since the queue would
    /// deadlock otherwise. It is recommended that you set `max_queue_size >> segment_size`.
    ///
    /// Default value: None
    max_queue_size: Option<NonZeroU64>,
}

impl Default for SenderBuilder {
    fn default() -> SenderBuilder {
        SenderBuilder {
            segment_size: NonZeroU64::new(1024 * 1024 * 4).expect("impossible"), // 4MB
            max_queue_size: None,
        }
    }
}

impl SenderBuilder {
    /// Creates a new sender builder. Finish build it by invoking [`SenderBuilder::open`].
    pub fn new() -> SenderBuilder {
        SenderBuilder::default()
    }

    /// The segment size in bytes that will trigger a new segment to be created. Segments an be
    /// bigger than this to accommodate the last element, but nothing beyond that (each segment
    /// must store at least one element).
    ///
    /// Default value: `4 * 1024 * 1024`, or 4MB.
    ///
    /// # Panics
    ///
    /// This function panics if `size` is zero.
    pub fn segment_size(mut self, size: u64) -> SenderBuilder {
        let size = NonZeroU64::new(size).expect("got segment_size=0");
        self.segment_size = size;
        self
    }

    /// The queue size that will block the sender from creating a new segment (until the receiver
    /// catches up, deleting old segments). The queue can get bigger than that, but only to
    /// accommodate the last segment (the queue must have at least one segment). Set this to `None`
    /// to create an unbounded queue.
    ///
    /// This value will be ignored if the queue has only one segment, since the queue would
    /// deadlock otherwise. It is recommended that you set `max_queue_size >> segment_size`.
    ///
    /// Default value: `None`
    ///
    /// # Panics
    ///
    /// This function panics if `size` is zero.
    pub fn max_queue_size(mut self, size: Option<u64>) -> SenderBuilder {
        let size = size.map(|s| NonZeroU64::new(s).expect("got max_queue_size=0"));
        self.max_queue_size = size;
        self
    }

    /// Opens a queue on a folder indicated by the `base` path for sending. The
    /// folder will be created if it does not already exist.
    ///
    /// # Errors
    ///
    /// This function will return an IO error if the queue is already in use for
    /// sending, which is indicated by a lock file. Also, any other IO error
    /// encountered while opening will be sent.
    pub fn open<P: AsRef<Path>>(self, base: P) -> io::Result<Sender> {
        // Guarantee that the queue exists:
        create_dir_all(base.as_ref())?;

        log::trace!("created queue directory");

        // Versioning stuff (this should be lightning-fast. Therefore, shameless block):
        check_queue_version(base.as_ref())?;

        // Acquire lock and guess state:
        let file_guard = try_acquire_send_lock(base.as_ref())?;
        let state = QueueState::for_send_metadata(base.as_ref())?;

        log::trace!("sender lock acquired. Sender state now is {:?}", state);

        // See the docs on OpenOptions::append for why the BufWriter here.
        let file = io::BufWriter::new(
            OpenOptions::new()
                .create(true)
                .append(true)
                .open(segment_filename(base.as_ref(), state.segment))?,
        );

        log::trace!("last segment opened for appending");

        Ok(Sender {
            segment_size: self.segment_size,
            max_queue_size: self.max_queue_size,
            _file_guard: file_guard,
            file,
            state,
            deletion_stream: None,
            base: PathBuf::from(base.as_ref()),
        })
    }
}

/// The sender part of the queue. This part is lock-free and therefore can be
/// used outside an asynchronous context.
pub struct Sender {
    segment_size: NonZeroU64,
    max_queue_size: Option<NonZeroU64>,
    _file_guard: FileGuard,
    file: io::BufWriter<File>,
    state: QueueState,
    deletion_stream: Option<DeletionEvent>, // lazy initiated!
    base: PathBuf,
}

impl Sender {
    /// Opens a queue on a folder indicated by the `base` path for sending. The
    /// folder will be created if it does not already exist.
    ///
    /// # Errors
    ///
    /// This function will return an IO error if the queue is already in use for
    /// sending, which is indicated by a lock file. Also, any other IO error
    /// encountered while opening will be sent.
    pub fn open<P: AsRef<Path>>(base: P) -> io::Result<Sender> {
        SenderBuilder::default().open(base)
    }

    /// Saves the sender queue state. You do not need to use method in most
    /// circumstances, since it is automatically done on drop (yes, it will be
    /// called eve if your thread panics). However, you can use this function to
    ///
    /// 1. Make periodical backups. Use an external timer implementation for this.
    ///
    /// 2. Handle possible IO errors in sending. The `drop` implementation will
    /// ignore (but log) any io errors, which may lead to data loss in an
    /// unreliable filesystem. It was implemented this way because no errors
    /// are allowed to propagate on drop and panicking will abort the program if
    /// drop is called during a panic.
    #[deprecated(
        since = "0.5.0",
        note = "the sender state is now always inferred. There is no need to save anything"
    )]
    pub fn save(&mut self) -> io::Result<()> {
        Ok(())
    }

    /// Just writes to the internal buffer, but doesn't flush it.
    fn write(&mut self, data: &[u8]) -> io::Result<u64> {
        // Get length of the data and write the header:
        let len = data.as_ref().len();
        assert!(len < std::u64::MAX as usize);
        let header = Header::new(len as u32).encode();

        // Write stuff to the file:
        self.file.write_all(&header)?;
        self.file.write_all(data.as_ref())?;

        Ok(4 + len as u64)
    }

    /// Tests whether the queue is past the end of the current segment.
    fn is_past_end(&self) -> bool {
        self.state.position > self.segment_size.get()
    }

    /// Caps off a segment by writing an EOF header and then moves segment.
    /// This function returns `Ok(true)` if it has created a new segment or
    /// `Ok(false)` if it has not (because the queue was too big).
    #[must_use = "you need to always check if a segment was created or not!"]
    fn try_cap_off_and_move(&mut self) -> io::Result<bool> {
        if let Some(max_queue_size) = self.max_queue_size {
            let queue_size = get_queue_size(&self.base)?;

            // Have to check if the number of segments is at least one. Otherwise, the queue will
            // deadlock.
            if queue_size.in_bytes >= max_queue_size.get() && queue_size.in_segments > 1 {
                log::trace!(
                    "oops! Directory size is {}, but max queue size is {}",
                    queue_size.in_bytes,
                    max_queue_size.get()
                );
                return Ok(false);
            }
        }

        log::trace!("there is enough space for a new segment. Let's cap off and move on!");

        // Write EOF header:
        self.file.write(&HEADER_EOF)?;
        self.file.flush()?;

        // Preserves the already allocated buffer:
        *self.file.get_mut() = OpenOptions::new()
            .create(true)
            .append(true)
            .open(segment_filename(&self.base, self.state.advance_segment()))?;

        Ok(true)
    }

    fn maybe_cap_off_and_move<T>(&mut self, item: T) -> Result<T, TrySendError<T>> {
        // See if you are past the end of the file
        if self.is_past_end() {
            log::trace!("is past the segment end. Trying to cap off and move");

            // If so, create a new file, if you are able to:
            if !self.try_cap_off_and_move()? {
                log::trace!(
                    "could not cap off and move. The queue `{:?}` is full",
                    self.base
                );

                return Err(TrySendError::QueueFull {
                    item,
                    base: self.base.clone(),
                });
            }
        }

        Ok(item)
    }

    /// Lazy inits the future that completes every time a file is deleted.
    fn deletion_stream(&mut self) -> &mut DeletionEvent {
        if self.deletion_stream.is_none() {
            let deletion_stream = DeletionEvent::new(&self.base);
            self.deletion_stream = Some(deletion_stream);
        }

        self.deletion_stream.as_mut().unwrap() // because if was not Some, now it is.
    }

    /// Tries to sends some data into the queue. If the queue is too big to
    /// insert (as set in `max_queue_size`), this returns
    /// [`TrySendError::QueueFull`]. One send is always atomic.
    ///
    /// # Errors
    ///
    /// This function returns any underlying errors encountered while writing or
    /// flushing the queue. Also, it returns [`TrySendError::QueueFull`] if the
    /// queue is too big.
    pub fn try_send<D: AsRef<[u8]>>(&mut self, data: D) -> Result<(), TrySendError<D>> {
        let data = self.maybe_cap_off_and_move(data)?;

        // Write to the queue and flush:
        let written = self.write(data.as_ref())?;
        self.file.flush()?; // guarantees atomic operation. See `new`.
        self.state.advance_position(written);

        Ok(())
    }

    /// Sends some data into the queue. One send is always atomic. This function is
    /// `async` because the queue might be full and so we need to `.await` the
    /// receiver to consume enough segments to clear the queue.
    ///
    /// # Errors
    ///
    /// This function returns any underlying errors encountered while writing or
    /// flushing the queue.
    ///
    pub async fn send<D: AsRef<[u8]>>(&mut self, mut data: D) -> io::Result<()> {
        loop {
            match self.try_send(data) {
                Ok(()) => break Ok(()),
                Err(TrySendError::Io(err)) => break Err(err),
                Err(TrySendError::QueueFull { item, .. }) => {
                    data = item; // the "unmove"!
                    self.deletion_stream().await // prevents spin lock
                }
            }
        }
    }

    /// Tries to send all the contents of an iterable into the queue. If the
    /// queue is too big to insert (as set in `max_queue_size`), this returns
    /// [`TrySendError::QueueFull`]. All is buffered to be sent atomically, in
    /// one flush operation. Since this operation is atomic, it does not create
    /// new segments during the iteration. Be mindful of that when using this
    /// method for large writes.
    ///
    /// # Errors
    ///
    /// This function returns any underlying errors encountered while writing or
    /// flushing the queue. Also, it returns [`TrySendError::QueueFull`] if the
    /// queue is too big.
    pub fn try_send_batch<I>(&mut self, it: I) -> Result<(), TrySendError<I>>
    where
        I: IntoIterator,
        I::Item: AsRef<[u8]>,
    {
        let it = self.maybe_cap_off_and_move(it)?;

        let mut written = 0;
        // Drain iterator into the buffer.
        for item in it {
            written += self.write(item.as_ref())?;
        }

        self.file.flush()?; // guarantees atomic operation. See `new`.
        self.state.advance_position(written);

        Ok(())
    }

    /// Sends all the contents of an iterable into the queue. This function is
    /// `async` because the queue might be full and so we need to `.await` the
    /// receiver to consume enough segments to clear the queue. All is buffered
    /// to be sent atomically, in one flush operation. Since this operation is
    /// atomic, it does not create new segments during the iteration. Be
    /// mindful of that when using this method for large writes.
    ///
    /// # Errors
    ///
    /// This function returns any underlying errors encountered while writing or
    /// flushing the queue.
    pub async fn send_batch<I>(&mut self, mut it: I) -> io::Result<()>
    where
        I: IntoIterator,
        I::Item: AsRef<[u8]>,
    {
        loop {
            match self.try_send_batch(it) {
                Ok(()) => break Ok(()),
                Err(TrySendError::Io(err)) => break Err(err),
                Err(TrySendError::QueueFull { item, .. }) => {
                    it = item; // the "unmove"!
                    self.deletion_stream().await // prevents spin lock
                }
            }
        }
    }
}