1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
use zlo::{serialize_into, Infinite};
use private;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::fs;
use std::io::{BufWriter, Write};
use std::marker::PhantomData;
use std::path::{Path, PathBuf};

#[inline]
fn u32tou8abe(v: u32) -> [u8; 4] {
    [v as u8, (v >> 8) as u8, (v >> 24) as u8, (v >> 16) as u8]
}

#[derive(Debug)]
/// The 'send' side of hopper, similar to
/// [`std::sync::mpsc::Sender`](https://doc.rust-lang.org/std/sync/mpsc/struct.
/// Sender.html).
pub struct Sender<T> {
    name: String,
    root: PathBuf, // directory we store our queues in
    path: PathBuf, // active fp filename
    seq_num: usize,
    max_bytes: usize,
    fs_lock: private::FSLock<T>,
    resource_type: PhantomData<T>,
}

impl<'de, T> Clone for Sender<T>
where
    T: Serialize + Deserialize<'de>,
{
    fn clone(&self) -> Sender<T> {
        use std::sync::Arc;
        Sender::new(
            self.name.clone(),
            &self.root,
            self.max_bytes,
            Arc::clone(&self.fs_lock),
        ).expect("COULD NOT CLONE")
    }
}

impl<T> Sender<T>
where
    T: Serialize,
{
    #[doc(hidden)]
    pub fn new<S>(
        name: S,
        data_dir: &Path,
        max_bytes: usize,
        fs_lock: private::FSLock<T>,
    ) -> Result<Sender<T>, super::Error>
    where
        S: Into<String> + fmt::Display,
    {
        use std::sync::Arc;
        let init_fs_lock = Arc::clone(&fs_lock);
        let mut syn = init_fs_lock.lock().expect("Sender fs_lock poisoned");
        if !data_dir.is_dir() {
            return Err(super::Error::NoSuchDirectory);
        }
        let seq_num = match fs::read_dir(data_dir)
            .unwrap()
            .map(|de| {
                de.unwrap()
                    .path()
                    .file_name()
                    .unwrap()
                    .to_str()
                    .unwrap()
                    .parse::<usize>()
                    .unwrap()
            })
            .max()
        {
            Some(sn) => sn,
            None => 0,
        };
        let log = data_dir.join(format!("{}", seq_num));
        match fs::OpenOptions::new().append(true).create(true).open(&log) {
            Ok(fp) => {
                syn.sender_fp = Some(BufWriter::new(fp));
                (*syn).sender_seq_num = seq_num;
                Ok(Sender {
                    name: name.into(),
                    root: data_dir.to_path_buf(),
                    path: log,
                    seq_num: seq_num,
                    max_bytes: max_bytes,
                    fs_lock: fs_lock,
                    resource_type: PhantomData,
                })
            }
            Err(e) => panic!("[Sender] failed to start {:?}", e),
        }
    }

    /// send writes data out in chunks, like so:
    ///
    ///  u32: payload_size
    ///  [u8] payload
    ///
    pub fn send(&mut self, event: T) {
        let mut syn = self.fs_lock.lock().expect("Sender fs_lock poisoned");
        let fslock = &mut (*syn);

        if fslock.sender_idx < fslock.in_memory_idx {
            fslock.mem_buffer.push_back(event);
        } else {
            fslock.disk_buffer.push_back(event);
            if fslock.disk_buffer.len() >= fslock.in_memory_idx {
                while let Some(ev) = fslock.disk_buffer.pop_front() {
                    let mut pyld = Vec::with_capacity(64);
                    serialize_into(&mut pyld, &ev, Infinite).expect("could not serialize");
                    // NOTE The conversion of t.len to u32 and usize is _only_
                    // safe when u32 <= usize. That's very likely to hold true
                    // for machines--for now?--that hopper will run on. However!
                    let pyld_sz_bytes: [u8; 4] = u32tou8abe(pyld.len() as u32);
                    let mut t = vec![0, 0, 0, 0];
                    t[0] = pyld_sz_bytes[3];
                    t[1] = pyld_sz_bytes[2];
                    t[2] = pyld_sz_bytes[1];
                    t[3] = pyld_sz_bytes[0];
                    t.append(&mut pyld);
                    // If the individual sender writes enough to go over the max
                    // we mark the file read-only--which will help the receiver
                    // to decide it has hit the end of its log file--and create
                    // a new log file.
                    let bytes_written = fslock.bytes_written + t.len();
                    if (bytes_written > self.max_bytes) || (self.seq_num != fslock.sender_seq_num)
                        || fslock.sender_fp.is_none()
                    {
                        // Once we've gone over the write limit for our current
                        // file or find that we've gotten behind the current
                        // queue file we need to seek forward to find our place
                        // in the space of queue files. We mark our current file
                        // read-only--there's some possibility that this will be
                        // done redundantly, but that's okay--and then read the
                        // current sender_seq_num to get up to date.
                        let _ = fs::metadata(&self.path).map(|p| {
                            let mut permissions = p.permissions();
                            permissions.set_readonly(true);
                            let _ = fs::set_permissions(&self.path, permissions);
                        });
                        if fslock.sender_fp.is_some() {
                            if self.seq_num != fslock.sender_seq_num {
                                // This thread is behind the leader. We've got to
                                // set our current notion of seq_num forward and
                                // then open the corresponding file.
                                self.seq_num = fslock.sender_seq_num;
                            } else {
                                // This thread is the leader. We reset the
                                // sender_seq_num and bytes written and open the
                                // next queue file. All follower threads will hit
                                // the branch above this one.
                                fslock.sender_seq_num = self.seq_num.wrapping_add(1);
                                self.seq_num = fslock.sender_seq_num;
                                fslock.bytes_written = 0;
                            }
                        }
                        self.path = self.root.join(format!("{}", self.seq_num));
                        match fs::OpenOptions::new()
                            .append(true)
                            .create(true)
                            .open(&self.path)
                        {
                            Ok(fp) => fslock.sender_fp = Some(BufWriter::new(fp)),
                            Err(e) => panic!("FAILED TO OPEN {:?} WITH {:?}", &self.path, e),
                        }
                    }

                    assert!(fslock.sender_fp.is_some());
                    if let Some(ref mut fp) = fslock.sender_fp {
                        match fp.write(&t[..]) {
                            Ok(written) => fslock.bytes_written += written,
                            Err(e) => panic!("Write error: {}", e),
                        }
                        fslock.disk_writes_to_read += 1;
                    }
                }
                assert!(fslock.sender_fp.is_some());
                if let Some(ref mut fp) = fslock.sender_fp {
                    fp.flush().expect("unable to flush");
                }
            }
        }
        fslock.writes_to_read += 1;
        if (fslock.sender_captured_recv_id != fslock.receiver_read_id)
            || fslock.write_bound.is_none()
        {
            fslock.sender_captured_recv_id = fslock.receiver_read_id;
            fslock.write_bound = Some(fslock.sender_idx);
        }
        fslock.sender_idx += 1;
    }

    /// Return the sender's name
    pub fn name(&self) -> &str {
        &self.name
    }
}