1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
use super::{Connection, PendingRequestFlags};
use crate::{event::Event, util::cycled_zeroes};
use core::iter;
use tinyvec::TinyVec;
const TYPE_ERROR: u8 = 0;
const TYPE_REPLY: u8 = 1;
#[derive(Debug)]
pub(crate) struct PendingRequest {
first_request: u64,
last_request: u64,
flags: PendingRequestFlags,
}
impl<Conn: Connection> super::Display<Conn> {
#[inline]
fn process_bytes(&mut self, mut bytes: TinyVec<[u8; 32]>) -> crate::Result {
let sequence = u16::from_ne_bytes([bytes[2], bytes[3]]);
log::trace!("Found response bytes: {}", &bytes);
if bytes[0] == TYPE_REPLY {
log::debug!("Received bytes of type REPLY");
bytes.move_to_the_heap();
let bytes = match bytes {
TinyVec::Heap(h) => h.into_boxed_slice(),
TinyVec::Inline(_) => unreachable!(),
};
self.pending_replies.insert(sequence, bytes);
} else if bytes[0] == TYPE_ERROR {
if !bytes.iter().copied().any(|x| x != 0) {
return Err(crate::BreadError::ClosedConnection);
}
return Err(crate::BreadError::from_x_error(bytes));
} else {
log::debug!("Received bytes of type EVENT");
let event = Event::from_bytes(bytes)?;
self.event_queue.push_back(event);
}
Ok(())
}
#[allow(clippy::unused_self)]
#[inline]
pub(crate) fn expect_reply(&mut self, _req: u64, _flags: PendingRequestFlags) {
}
#[inline]
pub(crate) fn wait(&mut self) -> crate::Result {
log::debug!("Running wait cycle");
let mut bytes: TinyVec<[u8; 32]> = cycled_zeroes(32);
self.connection.read_packet(&mut bytes)?;
if let Some(ab) = additional_bytes(&bytes[..8]) {
if ab != 0 {
bytes.extend(iter::once(0).cycle().take(ab * 4));
log::debug!("Waiting for {} additional bytes", ab * 4);
self.connection.read_packet(&mut bytes[32..])?;
log::debug!("Ending wait with {} additional bytes", ab * 4);
}
}
self.process_bytes(bytes)
}
#[cfg(feature = "async")]
#[inline]
pub(crate) async fn wait_async(&mut self) -> crate::Result {
let mut bytes: TinyVec<[u8; 32]> = cycled_zeroes(32);
self.connection.read_packet_async(&mut bytes).await?;
if let Some(ab) = additional_bytes(&bytes[..8]) {
bytes.extend(iter::once(0).cycle().take(ab * 4));
self.connection.read_packet_async(&mut bytes[32..]).await?;
}
self.process_bytes(bytes)
}
}
#[inline]
fn additional_bytes(bytes: &[u8]) -> Option<usize> {
if bytes[0] == TYPE_REPLY {
let mut len_bytes = [0; 4];
len_bytes.copy_from_slice(&bytes[4..8]);
Some(u32::from_ne_bytes(len_bytes) as usize)
} else {
None
}
}