1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
use crate::{BufferWriter, HeapBufferPool, buffer::BufferPtr, packet::Packet};
pub struct Framer {
buffer_writer: BufferWriter,
frame_start: usize,
}
impl Framer {
pub fn new(buffer_pool: HeapBufferPool) -> Self {
Self {
buffer_writer: BufferWriter::new(buffer_pool),
frame_start: 0,
}
}
#[inline]
pub async fn write(&mut self) -> &mut [u8] {
self.buffer_writer.write().await
}
#[inline]
pub fn try_write(&mut self) -> Option<&mut [u8]> {
self.buffer_writer.try_write()
}
#[inline]
pub fn remaining_on_buffer(&self) -> usize {
self.buffer_writer.remaining_on_buffer()
}
#[inline]
pub fn commit(&mut self, len: usize) {
self.buffer_writer.commit(len)
}
#[inline]
pub fn finish_frame(&mut self) -> Option<Packet> {
let (buffer, written_len) = self.buffer_writer.state()?;
if written_len != self.frame_start {
let packet = Self::produce_packet(buffer, self.frame_start, written_len, false);
self.frame_start = written_len;
Some(packet)
} else {
None
}
}
#[inline]
pub fn next_buffer(&mut self) -> Option<Packet> {
let (buffer, written_len) = self.buffer_writer.next_buffer()?;
if written_len != self.frame_start {
let packet_start = self.frame_start;
self.frame_start = 0;
Some(Self::produce_packet(
buffer,
packet_start,
written_len,
true,
))
} else {
// No new messages were written since the last call to `finish_frame` - decrement the
// reference count on the current buffer.
if self.frame_start == 0 {
// No messages were written on this buffer at all, so the reference count was never
// initialized.
unsafe {
buffer.initialize_rc(1, 0, 0);
}
}
unsafe {
buffer.release_ref(1);
}
None
}
}
#[inline]
fn produce_packet(
buffer: BufferPtr,
packet_start: usize,
packet_end: usize,
buffer_done: bool,
) -> Packet {
// Four scenarios to handle when updating the buffer's reference count:
// 1. It's the first and only message on the buffer - set the reference count to 1.
// 2. It's the first message of potentially multiple on the buffer - set the reference count
// to 2. One for the new Packet and one for us since the buffer can still be written
// to and so can't be freed.
// 3. It's not the first message on the buffer and we aren't switching buffers yet -
// increment the reference count by 1.
// 4. It's the last of multiple messages on the buffer - don't modify the reference count.
// The decrement we'd do since we are switching buffers cancels out with the increment
// for the new message.
if packet_start == 0 {
if buffer_done {
// Scenario 1
unsafe {
buffer.initialize_rc(1, 0, 0);
}
} else {
// Scenario 2
unsafe {
buffer.initialize_rc(2, 0, 0);
}
}
} else if !buffer_done {
// Scenario 3
unsafe {
buffer.take_ref(1);
}
} else {
// Scenario 4 - do nothing
}
unsafe { Packet::new(buffer, packet_start, packet_end - packet_start) }
}
}
impl Drop for Framer {
fn drop(&mut self) {
// If we're holding onto a buffer, release it.
self.next_buffer();
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_framer() {
use crate::HeapBufferPool;
let buffer_pool = HeapBufferPool::new(16, 4, 4);
let mut framer = Framer::new(buffer_pool);
for _ in 0..32 {
{
let buf: &mut [u8] = pollster::block_on(framer.write());
buf[..5].copy_from_slice(b"hello");
}
framer.commit(5);
let message = framer.next_buffer().unwrap();
assert_eq!(&*message, b"hello");
}
}
}