1use modrpc::{
2 WriterConfig,
3 TransportBuilder,
4 TransportContext,
5 TransportHandle,
6};
7
8use crate::{
9 broadcaster::InPacket,
10 BroadcasterHandle,
11 ChannelId,
12 TransportIndex,
13};
14
15pub enum LocalHubTransport {
16 Static {
17 buffer_pool: bab::HeapBufferPool,
18 broadcaster_handle: BroadcasterHandle,
19 channel_ids: Vec<u32>,
20 },
21 Dynamic {
22 buffer_pool: bab::HeapBufferPool,
23 broadcaster_handle: BroadcasterHandle,
24 broadcaster_transport: TransportIndex,
25 },
26}
27
28impl TransportBuilder for LocalHubTransport {
29 async fn start_transport(self, cx: TransportContext<'_>) -> TransportHandle {
30 let Some(worker_cx) = cx.rt.local_worker_context() else {
31 panic!("modrpc runtime must have a local worker to create a LocalHubTransport.");
32 };
33
34 let (buffer_pool, broadcaster_handle, broadcaster_transport) = match self {
35 Self::Static { buffer_pool, broadcaster_handle, channel_ids } => {
36 let broadcaster_transport =
37 broadcaster_handle.add_local(worker_cx.local_packet_tx().clone()).await;
38
39 for channel_id in channel_ids {
40 broadcaster_handle.add_next_hop_to_channels(
41 broadcaster_transport,
42 vec![
43 (ChannelId { channel_id }, ChannelId { channel_id }),
44 ],
45 )
46 .await;
47 }
48
49 (buffer_pool, broadcaster_handle, broadcaster_transport)
50 }
51 Self::Dynamic { buffer_pool, broadcaster_handle, broadcaster_transport } => {
52 (buffer_pool, broadcaster_handle, broadcaster_transport)
53 }
54 };
55
56 let shutdown_signal = bab::SignalTree::new();
57 let broadcaster_sender = broadcaster_handle.in_packet_sender().clone();
58
59 let (writer_flush_sender, mut writer_flush_receiver) = bab::new_writer_flusher();
60 let writer_config = WriterConfig::LocalFlush {
61 writer_flush_sender: writer_flush_sender.clone(),
62 };
63
64 worker_cx.spawn_traced("local-hub-transport-flush", core::time::Duration::from_millis(1000), {
66 let buffer_pool = buffer_pool.clone();
67 let shutdown_signal = shutdown_signal.clone();
68 let broadcaster_transport = broadcaster_transport;
69 async move |tracer| {
70 let _buffer_pool_thread_guard = buffer_pool.register_thread();
71 futures_lite::future::or(
72 async {
73 loop {
74 for flush in writer_flush_receiver.flush().await {
75 let result = tracer.trace_future(async {
76 probius::trace_metric("buffer_len", flush.len() as i64);
77 let in_packet = InPacket {
78 transport: broadcaster_transport,
79 channel_id: flush.writer_id() as u32,
80 packet: flush.into(),
81 };
82 probius::trace_branch_start();
83 if let Err(e) = broadcaster_sender.send(in_packet).await {
84 probius::trace_label("send_fail_abort");
85 probius::trace_branch_end();
86 return Err(e);
87 }
88 probius::trace_label("send_success");
89 probius::trace_branch_end();
90
91 Ok(())
92 })
93 .await;
94
95 if result.is_err() {
96 break;
97 }
98 }
99 }
100 },
101 shutdown_signal.wait_owned(),
102 )
103 .await
104 }
105 });
106
107 TransportHandle {
108 buffer_pool,
109 writer_config,
110 shutdown_signal,
111 }
112 }
113}