std_modrpc/role_impls/
byte_stream_sender.rs

1use std::{
2    cell::Cell,
3    rc::Rc,
4};
5
6use modrpc::RoleSetup;
7
8use crate::{
9    ByteStreamInitState,
10    ByteStreamSenderConfig,
11    ByteStreamSenderHooks,
12    ByteStreamSenderStubs,
13};
14
15struct State {
16    hooks: ByteStreamSenderHooks,
17    send_cursor: Cell<u64>,
18}
19
20#[derive(Clone)]
21pub struct ByteStreamSender {
22    state: Rc<State>,
23}
24
25pub struct ByteStreamSenderBuilder {
26    state: Rc<State>,
27}
28
29impl ByteStreamSenderBuilder {
30    pub fn new(
31        _name: &'static str,
32        hooks: ByteStreamSenderHooks,
33        _stubs: ByteStreamSenderStubs,
34        _config: &ByteStreamSenderConfig,
35        _init: ByteStreamInitState,
36    ) -> Self {
37        let state = Rc::new(State {
38            hooks: hooks.clone(),
39            send_cursor: Cell::new(0),
40        });
41        Self { state }
42    }
43
44    pub fn create_handle(
45        &self,
46        _setup: &RoleSetup,
47    ) -> ByteStreamSender {
48        ByteStreamSender {
49            state: self.state.clone(),
50        }
51    }
52
53    pub fn build(
54        self,
55        _setup: &RoleSetup,
56    ) {
57    }
58}
59
60impl ByteStreamSender {
61    pub async fn send(&self, bytes: &[u8]) -> u64 {
62        let start_index = self.state.send_cursor.get();
63        self.state.send_cursor.set(start_index + bytes.len() as u64);
64
65        self.state.hooks.blob.send_raw(8 + bytes.len(), |write_buf| {
66            write_buf[..8].copy_from_slice(&start_index.to_le_bytes());
67            write_buf[8..].copy_from_slice(bytes);
68        })
69        .await;
70
71        start_index
72    }
73
74    /// SAFETY: You must have exclusive ownership of the buffer and there must be enough headroom
75    /// for modrpc::TransmitPacket::BASE_LEN + 8 bytes
76    pub async unsafe fn send_buffer(&self, buffer: modrpc::BufferPtr) -> u64 {
77        let headroom = <modrpc::TransmitPacket as mproto::BaseLen>::BASE_LEN;
78        let payload_len =
79            modrpc::WriterFlushSender::get_complete_buffer_len(buffer) as usize - headroom - 8;
80
81        let start_index = self.state.send_cursor.get();
82        self.state.send_cursor.set(start_index + payload_len as u64);
83
84        // Write the start index
85        let headroom = <modrpc::TransmitPacket as mproto::BaseLen>::BASE_LEN;
86        let start_index_buf = unsafe { buffer.slice_mut(headroom..headroom + 8) };
87        start_index_buf.copy_from_slice(&start_index.to_le_bytes());
88
89        unsafe { self.state.hooks.blob.send_buffer(buffer).await; }
90
91        start_index
92    }
93
94    pub async fn wait_consumed(&self, _cursor: u64) {
95        // TODO
96    }
97}