stream_body/
data.rs

1use crate::state::State;
2use bytes::Buf;
3use std::sync::{Arc, Mutex};
4
5/// The data chunk type produced by `StreamBody`.
6pub struct StreamData {
7    ptr: *const u8,
8    len: usize,
9    pos: usize,
10    state: Arc<Mutex<State>>,
11}
12
13impl StreamData {
14    pub(crate) fn new(s: &[u8], state: Arc<Mutex<State>>) -> StreamData {
15        StreamData {
16            ptr: s.as_ptr(),
17            len: s.len(),
18            pos: 0,
19            state,
20        }
21    }
22}
23
24unsafe impl std::marker::Send for StreamData {}
25
26impl Buf for StreamData {
27    fn remaining(&self) -> usize {
28        self.len - self.pos
29    }
30
31    fn bytes(&self) -> &[u8] {
32        unsafe { std::slice::from_raw_parts(self.ptr.add(self.pos), self.len - self.pos) }
33    }
34
35    fn advance(&mut self, cnt: usize) {
36        self.pos += cnt;
37    }
38}
39
40impl Drop for StreamData {
41    fn drop(&mut self) {
42        match self.state.lock() {
43            Ok(mut state) => {
44                state.is_current_stream_data_consumed = true;
45                if let Some(ref waker) = state.waker {
46                    waker.clone().wake();
47                }
48                state.waker = None;
49            }
50            Err(err) => log::error!(
51                "{}: StreamData: Failed to update the drop state: {}",
52                env!("CARGO_PKG_NAME"),
53                err
54            ),
55        }
56    }
57}