1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
use std::cell::{Cell, RefCell};
use std::io::{self, Cursor, Write};
use std::time::{Duration, SystemTime};

use crate::error::Error;
use crate::fiber::{reschedule, Cond};

pub struct SendQueue {
    is_active: Cell<bool>,
    sync: Cell<u64>,
    front_buffer: RefCell<Cursor<Vec<u8>>>,
    back_buffer: RefCell<Cursor<Vec<u8>>>,
    swap_cond: Cond,
    buffer_limit: u64,
    flush_interval: Duration,
}

impl SendQueue {
    pub fn new(buffer_size: usize, buffer_limit: usize, flush_interval: Duration) -> Self {
        SendQueue {
            is_active: Cell::new(true),
            sync: Cell::new(0),
            front_buffer: RefCell::new(Cursor::new(Vec::with_capacity(buffer_size))),
            back_buffer: RefCell::new(Cursor::new(Vec::with_capacity(buffer_size))),
            swap_cond: Cond::new(),
            buffer_limit: buffer_limit as u64,
            flush_interval,
        }
    }

    pub fn send<F>(&self, payload_producer: F) -> Result<u64, Error>
    where
        F: FnOnce(&mut Cursor<Vec<u8>>, u64) -> Result<(), Error>,
    {
        let sync = self.next_sync();

        if self.back_buffer.borrow().position() >= self.buffer_limit {
            self.swap_cond.signal();
        }

        let offset = {
            let buffer = &mut *self.back_buffer.borrow_mut();

            let offset = buffer.position();
            match write_to_buffer(buffer, sync, payload_producer) {
                Err(err) => {
                    // rollback buffer position on error
                    buffer.set_position(offset);
                    return Err(err);
                }
                Ok(_) => offset,
            }
        };

        // trigger swap condition if buffer was empty before
        if offset == 0 {
            self.swap_cond.signal();
        }

        Ok(sync)
    }

    pub fn next_sync(&self) -> u64 {
        let sync = self.sync.get() + 1;
        self.sync.set(sync);
        sync
    }

    pub fn flush_to_stream(&self, stream: &mut impl Write) -> io::Result<()> {
        let start_ts = SystemTime::now();
        let mut prev_data_size = 0u64;

        loop {
            if !self.is_active.get() {
                return Err(io::Error::from(io::ErrorKind::TimedOut));
            }

            let data_size = self.back_buffer.borrow().position();
            if data_size == 0 {
                // await for data (if buffer is empty)
                self.swap_cond.wait();
                continue;
            }

            if let Ok(elapsed) = start_ts.elapsed() {
                if data_size > prev_data_size && elapsed <= self.flush_interval {
                    prev_data_size = data_size;
                    reschedule();
                    continue;
                }
            }

            self.back_buffer.swap(&self.front_buffer);
            break;
        }

        // write front buffer contents to stream + clear front buffer
        let mut buffer = self.front_buffer.borrow_mut();
        stream.write_all(buffer.get_ref())?;
        buffer.set_position(0);
        buffer.get_mut().clear();
        Ok(())
    }

    pub fn close(&self) {
        self.is_active.set(false);
        self.swap_cond.signal();
    }
}

pub fn write_to_buffer<F>(
    buffer: &mut Cursor<Vec<u8>>,
    sync: u64,
    payload_producer: F,
) -> Result<(), Error>
where
    F: FnOnce(&mut Cursor<Vec<u8>>, u64) -> Result<(), Error>,
{
    // write MSG_SIZE placeholder
    let msg_start_offset = buffer.position();
    rmp::encode::write_u32(buffer, 0)?;

    // write message payload
    let payload_start_offset = buffer.position();
    payload_producer(buffer, sync)?;
    let payload_end_offset = buffer.position();

    // calculate and write MSG_SIZE
    buffer.set_position(msg_start_offset);
    rmp::encode::write_u32(buffer, (payload_end_offset - payload_start_offset) as u32)?;
    buffer.set_position(payload_end_offset);

    Ok(())
}