std_modrpc/role_impls/
byte_stream_sender.rs1use std::{
2 cell::Cell,
3 rc::Rc,
4};
5
6use modrpc::RoleSetup;
7
8use crate::{
9 ByteStreamInitState,
10 ByteStreamSenderConfig,
11 ByteStreamSenderHooks,
12 ByteStreamSenderStubs,
13};
14
15struct State {
16 hooks: ByteStreamSenderHooks,
17 send_cursor: Cell<u64>,
18}
19
20#[derive(Clone)]
21pub struct ByteStreamSender {
22 state: Rc<State>,
23}
24
25pub struct ByteStreamSenderBuilder {
26 state: Rc<State>,
27}
28
29impl ByteStreamSenderBuilder {
30 pub fn new(
31 _name: &'static str,
32 hooks: ByteStreamSenderHooks,
33 _stubs: ByteStreamSenderStubs,
34 _config: &ByteStreamSenderConfig,
35 _init: ByteStreamInitState,
36 ) -> Self {
37 let state = Rc::new(State {
38 hooks: hooks.clone(),
39 send_cursor: Cell::new(0),
40 });
41 Self { state }
42 }
43
44 pub fn create_handle(
45 &self,
46 _setup: &RoleSetup,
47 ) -> ByteStreamSender {
48 ByteStreamSender {
49 state: self.state.clone(),
50 }
51 }
52
53 pub fn build(
54 self,
55 _setup: &RoleSetup,
56 ) {
57 }
58}
59
60impl ByteStreamSender {
61 pub async fn send(&self, bytes: &[u8]) -> u64 {
62 let start_index = self.state.send_cursor.get();
63 self.state.send_cursor.set(start_index + bytes.len() as u64);
64
65 self.state.hooks.blob.send_raw(8 + bytes.len(), |write_buf| {
66 write_buf[..8].copy_from_slice(&start_index.to_le_bytes());
67 write_buf[8..].copy_from_slice(bytes);
68 })
69 .await;
70
71 start_index
72 }
73
74 pub async unsafe fn send_buffer(&self, buffer: modrpc::BufferPtr) -> u64 {
77 let headroom = <modrpc::TransmitPacket as mproto::BaseLen>::BASE_LEN;
78 let payload_len =
79 modrpc::WriterFlushSender::get_complete_buffer_len(buffer) as usize - headroom - 8;
80
81 let start_index = self.state.send_cursor.get();
82 self.state.send_cursor.set(start_index + payload_len as u64);
83
84 let headroom = <modrpc::TransmitPacket as mproto::BaseLen>::BASE_LEN;
86 let start_index_buf = unsafe { buffer.slice_mut(headroom..headroom + 8) };
87 start_index_buf.copy_from_slice(&start_index.to_le_bytes());
88
89 unsafe { self.state.hooks.blob.send_buffer(buffer).await; }
90
91 start_index
92 }
93
94 pub async fn wait_consumed(&self, _cursor: u64) {
95 }
97}