Skip to main content

modrpc_hub/
local.rs

1use modrpc::{
2    WriterConfig,
3    TransportBuilder,
4    TransportContext,
5    TransportHandle,
6};
7
8use crate::{
9    broadcaster::InPacket,
10    BroadcasterHandle,
11    ChannelId,
12    TransportIndex,
13};
14
15pub enum LocalHubTransport {
16    Static {
17        buffer_pool: bab::HeapBufferPool,
18        broadcaster_handle: BroadcasterHandle,
19        channel_ids: Vec<u32>,
20    },
21    Dynamic {
22        buffer_pool: bab::HeapBufferPool,
23        broadcaster_handle: BroadcasterHandle,
24        broadcaster_transport: TransportIndex,
25    },
26}
27
28impl TransportBuilder for LocalHubTransport {
29    async fn start_transport(self, cx: TransportContext<'_>) -> TransportHandle {
30        let Some(worker_cx) = cx.rt.local_worker_context() else {
31            panic!("modrpc runtime must have a local worker to create a LocalHubTransport.");
32        };
33
34        let (buffer_pool, broadcaster_handle, broadcaster_transport) = match self {
35            Self::Static { buffer_pool, broadcaster_handle, channel_ids } => {
36                let broadcaster_transport =
37                    broadcaster_handle.add_local(worker_cx.local_packet_tx().clone()).await;
38
39                for channel_id in channel_ids {
40                    broadcaster_handle.add_next_hop_to_channels(
41                        broadcaster_transport,
42                        vec![
43                            (ChannelId { channel_id }, ChannelId { channel_id }),
44                        ],
45                    )
46                    .await;
47                }
48
49                (buffer_pool, broadcaster_handle, broadcaster_transport)
50            }
51            Self::Dynamic { buffer_pool, broadcaster_handle, broadcaster_transport } => {
52                (buffer_pool, broadcaster_handle, broadcaster_transport)
53            }
54        };
55
56        let shutdown_signal = bab::SignalTree::new();
57        let broadcaster_sender = broadcaster_handle.in_packet_sender().clone();
58
59        let (writer_flush_sender, mut writer_flush_receiver) = bab::new_writer_flusher();
60        let writer_config = WriterConfig::LocalFlush {
61            writer_flush_sender: writer_flush_sender.clone(),
62        };
63
64        // Spawn task to flush egress packets to the broadcaster
65        worker_cx.spawn_traced("local-hub-transport-flush", core::time::Duration::from_millis(1000), {
66            let buffer_pool = buffer_pool.clone();
67            let shutdown_signal = shutdown_signal.clone();
68            let broadcaster_transport = broadcaster_transport;
69            async move |tracer| {
70                let _buffer_pool_thread_guard = buffer_pool.register_thread();
71                futures_lite::future::or(
72                    async {
73                        loop {
74                            for flush in writer_flush_receiver.flush().await {
75                                let result = tracer.trace_future(async {
76                                    probius::trace_metric("buffer_len", flush.len() as i64);
77                                    let in_packet = InPacket {
78                                        transport: broadcaster_transport,
79                                        channel_id: flush.writer_id() as u32,
80                                        packet: flush.into(),
81                                    };
82                                    probius::trace_branch_start();
83                                    if let Err(e) = broadcaster_sender.send(in_packet).await {
84                                        probius::trace_label("send_fail_abort");
85                                        probius::trace_branch_end();
86                                        return Err(e);
87                                    }
88                                    probius::trace_label("send_success");
89                                    probius::trace_branch_end();
90
91                                    Ok(())
92                                })
93                                .await;
94
95                                if result.is_err() {
96                                    break;
97                                }
98                            }
99                        }
100                    },
101                    shutdown_signal.wait_owned(),
102                )
103                .await
104            }
105        });
106
107        TransportHandle {
108            buffer_pool,
109            writer_config,
110            shutdown_signal,
111        }
112    }
113}