sof_gossip_tuning/application/
service.rs1use crate::{
4 application::ports::RuntimeTuningPort,
5 domain::{
6 constants::{
7 DEDICATED_GOSSIP_CHANNEL_CAPACITY, DEDICATED_GOSSIP_CHANNEL_CONSUME_CAPACITY,
8 DEDICATED_GOSSIP_THREADS, DEDICATED_SHRED_DEDUP_CAPACITY,
9 DEDICATED_SOCKET_CONSUME_CHANNEL_CAPACITY, DEDICATED_TVU_RECEIVE_SOCKETS,
10 DEFAULT_INGEST_QUEUE_CAPACITY, DEFAULT_RECEIVER_COALESCE_WAIT_MS,
11 DEFAULT_UDP_BATCH_SIZE, HOME_GOSSIP_CHANNEL_CAPACITY,
12 HOME_GOSSIP_CHANNEL_CONSUME_CAPACITY, HOME_GOSSIP_THREADS, HOME_INGEST_QUEUE_CAPACITY,
13 HOME_SHRED_DEDUP_CAPACITY, HOME_TVU_RECEIVE_SOCKETS, HOME_UDP_BATCH_SIZE,
14 VPS_GOSSIP_CHANNEL_CONSUME_CAPACITY, VPS_GOSSIP_RECEIVER_CHANNEL_CAPACITY,
15 VPS_GOSSIP_THREADS, VPS_SHRED_DEDUP_CAPACITY, VPS_SOCKET_CONSUME_CHANNEL_CAPACITY,
16 VPS_TVU_RECEIVE_SOCKETS, VPS_UDP_BATCH_SIZE,
17 },
18 model::{
19 GossipChannelTuning, GossipTuningProfile, HostProfilePreset, IngestQueueMode,
20 PendingGossipQueuePlan, ReceiverFanoutProfile, SofRuntimeTuning,
21 },
22 value_objects::{QueueCapacity, ReceiverCoalesceWindow, TvuReceiveSocketCount},
23 },
24};
25
26#[derive(Debug, Clone, Copy, Default)]
28pub struct GossipTuningService;
29
30impl GossipTuningService {
31 #[must_use]
33 pub const fn preset_profile(preset: HostProfilePreset) -> GossipTuningProfile {
34 match preset {
35 HostProfilePreset::Home => GossipTuningProfile {
36 preset,
37 runtime: SofRuntimeTuning {
38 ingest_queue_mode: IngestQueueMode::Bounded,
39 ingest_queue_capacity: QueueCapacity::fixed(HOME_INGEST_QUEUE_CAPACITY),
40 udp_batch_size: HOME_UDP_BATCH_SIZE,
41 receiver_coalesce_window: ReceiverCoalesceWindow::fixed(
42 DEFAULT_RECEIVER_COALESCE_WAIT_MS,
43 ),
44 udp_receiver_core: None,
45 udp_receiver_pin_by_port: false,
46 tvu_receive_sockets: TvuReceiveSocketCount::fixed(HOME_TVU_RECEIVE_SOCKETS),
47 gossip_channel_consume_capacity: QueueCapacity::fixed(
48 HOME_GOSSIP_CHANNEL_CONSUME_CAPACITY,
49 ),
50 gossip_consume_threads: HOME_GOSSIP_THREADS,
51 gossip_listen_threads: HOME_GOSSIP_THREADS,
52 gossip_run_threads: HOME_GOSSIP_THREADS,
53 shred_dedup_capacity: HOME_SHRED_DEDUP_CAPACITY,
54 },
55 channels: GossipChannelTuning {
56 gossip_receiver_channel_capacity: QueueCapacity::fixed(
57 HOME_GOSSIP_CHANNEL_CAPACITY,
58 ),
59 socket_consume_channel_capacity: QueueCapacity::fixed(
60 HOME_GOSSIP_CHANNEL_CAPACITY,
61 ),
62 gossip_response_channel_capacity: QueueCapacity::fixed(
63 HOME_GOSSIP_CHANNEL_CAPACITY,
64 ),
65 },
66 fanout: ReceiverFanoutProfile::Conservative,
67 },
68 HostProfilePreset::Vps => GossipTuningProfile {
69 preset,
70 runtime: SofRuntimeTuning {
71 ingest_queue_mode: IngestQueueMode::Lockfree,
72 ingest_queue_capacity: QueueCapacity::fixed(DEFAULT_INGEST_QUEUE_CAPACITY),
73 udp_batch_size: VPS_UDP_BATCH_SIZE,
74 receiver_coalesce_window: ReceiverCoalesceWindow::fixed(
75 DEFAULT_RECEIVER_COALESCE_WAIT_MS,
76 ),
77 udp_receiver_core: None,
78 udp_receiver_pin_by_port: false,
79 tvu_receive_sockets: TvuReceiveSocketCount::fixed(VPS_TVU_RECEIVE_SOCKETS),
80 gossip_channel_consume_capacity: QueueCapacity::fixed(
81 VPS_GOSSIP_CHANNEL_CONSUME_CAPACITY,
82 ),
83 gossip_consume_threads: VPS_GOSSIP_THREADS,
84 gossip_listen_threads: VPS_GOSSIP_THREADS,
85 gossip_run_threads: VPS_GOSSIP_THREADS,
86 shred_dedup_capacity: VPS_SHRED_DEDUP_CAPACITY,
87 },
88 channels: GossipChannelTuning {
89 gossip_receiver_channel_capacity: QueueCapacity::fixed(
90 VPS_GOSSIP_RECEIVER_CHANNEL_CAPACITY,
91 ),
92 socket_consume_channel_capacity: QueueCapacity::fixed(
93 VPS_SOCKET_CONSUME_CHANNEL_CAPACITY,
94 ),
95 gossip_response_channel_capacity: QueueCapacity::fixed(
96 VPS_SOCKET_CONSUME_CHANNEL_CAPACITY,
97 ),
98 },
99 fanout: ReceiverFanoutProfile::Balanced,
100 },
101 HostProfilePreset::Dedicated => GossipTuningProfile {
102 preset,
103 runtime: SofRuntimeTuning {
104 ingest_queue_mode: IngestQueueMode::Lockfree,
105 ingest_queue_capacity: QueueCapacity::fixed(DEFAULT_INGEST_QUEUE_CAPACITY),
106 udp_batch_size: DEFAULT_UDP_BATCH_SIZE,
107 receiver_coalesce_window: ReceiverCoalesceWindow::fixed(
108 DEFAULT_RECEIVER_COALESCE_WAIT_MS,
109 ),
110 udp_receiver_core: None,
111 udp_receiver_pin_by_port: true,
112 tvu_receive_sockets: TvuReceiveSocketCount::fixed(
113 DEDICATED_TVU_RECEIVE_SOCKETS,
114 ),
115 gossip_channel_consume_capacity: QueueCapacity::fixed(
116 DEDICATED_GOSSIP_CHANNEL_CONSUME_CAPACITY,
117 ),
118 gossip_consume_threads: DEDICATED_GOSSIP_THREADS,
119 gossip_listen_threads: DEDICATED_GOSSIP_THREADS,
120 gossip_run_threads: DEDICATED_GOSSIP_THREADS,
121 shred_dedup_capacity: DEDICATED_SHRED_DEDUP_CAPACITY,
122 },
123 channels: GossipChannelTuning {
124 gossip_receiver_channel_capacity: QueueCapacity::fixed(
125 DEDICATED_GOSSIP_CHANNEL_CAPACITY,
126 ),
127 socket_consume_channel_capacity: QueueCapacity::fixed(
128 DEDICATED_SOCKET_CONSUME_CHANNEL_CAPACITY,
129 ),
130 gossip_response_channel_capacity: QueueCapacity::fixed(
131 DEDICATED_SOCKET_CONSUME_CHANNEL_CAPACITY,
132 ),
133 },
134 fanout: ReceiverFanoutProfile::Aggressive,
135 },
136 }
137 }
138
139 #[must_use]
141 pub const fn supported_runtime_tuning(profile: GossipTuningProfile) -> SofRuntimeTuning {
142 profile.runtime
143 }
144
145 #[must_use]
147 pub const fn pending_gossip_queue_plan(profile: GossipTuningProfile) -> PendingGossipQueuePlan {
148 PendingGossipQueuePlan {
149 gossip_receiver_channel_capacity: profile.channels.gossip_receiver_channel_capacity,
150 socket_consume_channel_capacity: profile.channels.socket_consume_channel_capacity,
151 gossip_response_channel_capacity: profile.channels.gossip_response_channel_capacity,
152 fanout: profile.fanout,
153 }
154 }
155
156 pub fn apply_supported_runtime_tuning<P>(profile: GossipTuningProfile, port: &mut P)
158 where
159 P: RuntimeTuningPort,
160 {
161 let runtime = Self::supported_runtime_tuning(profile);
162 Self::apply_runtime_tuning(runtime, port);
163 port.set_gossip_channel_tuning(profile.channels);
164 }
165
166 pub fn apply_runtime_tuning<P>(runtime: SofRuntimeTuning, port: &mut P)
168 where
169 P: RuntimeTuningPort,
170 {
171 port.set_ingest_queue_mode(runtime.ingest_queue_mode);
172 port.set_ingest_queue_capacity(runtime.ingest_queue_capacity);
173 port.set_udp_batch_size(runtime.udp_batch_size);
174 port.set_receiver_coalesce_window(runtime.receiver_coalesce_window);
175 port.set_udp_receiver_core(runtime.udp_receiver_core);
176 port.set_udp_receiver_pin_by_port(runtime.udp_receiver_pin_by_port);
177 port.set_tvu_receive_sockets(runtime.tvu_receive_sockets);
178 port.set_gossip_channel_consume_capacity(runtime.gossip_channel_consume_capacity);
179 port.set_gossip_consume_threads(runtime.gossip_consume_threads);
180 port.set_gossip_listen_threads(runtime.gossip_listen_threads);
181 port.set_gossip_run_threads(runtime.gossip_run_threads);
182 port.set_shred_dedup_capacity(runtime.shred_dedup_capacity);
183 }
184}
185
186impl GossipTuningProfile {
187 #[must_use]
189 pub const fn preset(preset: HostProfilePreset) -> Self {
190 GossipTuningService::preset_profile(preset)
191 }
192
193 #[must_use]
195 pub const fn supported_runtime_tuning(self) -> SofRuntimeTuning {
196 GossipTuningService::supported_runtime_tuning(self)
197 }
198
199 #[must_use]
201 pub const fn pending_gossip_queue_plan(self) -> PendingGossipQueuePlan {
202 GossipTuningService::pending_gossip_queue_plan(self)
203 }
204}