1use crate::state::State;
2use bytes::Buf;
3use std::sync::{Arc, Mutex};
4
5pub struct StreamData {
7 ptr: *const u8,
8 len: usize,
9 pos: usize,
10 state: Arc<Mutex<State>>,
11}
12
13impl StreamData {
14 pub(crate) fn new(s: &[u8], state: Arc<Mutex<State>>) -> StreamData {
15 StreamData {
16 ptr: s.as_ptr(),
17 len: s.len(),
18 pos: 0,
19 state,
20 }
21 }
22}
23
24unsafe impl std::marker::Send for StreamData {}
25
26impl Buf for StreamData {
27 fn remaining(&self) -> usize {
28 self.len - self.pos
29 }
30
31 fn bytes(&self) -> &[u8] {
32 unsafe { std::slice::from_raw_parts(self.ptr.add(self.pos), self.len - self.pos) }
33 }
34
35 fn advance(&mut self, cnt: usize) {
36 self.pos += cnt;
37 }
38}
39
40impl Drop for StreamData {
41 fn drop(&mut self) {
42 match self.state.lock() {
43 Ok(mut state) => {
44 state.is_current_stream_data_consumed = true;
45 if let Some(ref waker) = state.waker {
46 waker.clone().wake();
47 }
48 state.waker = None;
49 }
50 Err(err) => log::error!(
51 "{}: StreamData: Failed to update the drop state: {}",
52 env!("CARGO_PKG_NAME"),
53 err
54 ),
55 }
56 }
57}