sof_gossip_tuning/application/
service.rs1use crate::{
4 application::ports::RuntimeTuningPort,
5 domain::{
6 constants::{
7 DEDICATED_GOSSIP_CHANNEL_CAPACITY, DEDICATED_SOCKET_CONSUME_CHANNEL_CAPACITY,
8 DEDICATED_TVU_RECEIVE_SOCKETS, DEFAULT_INGEST_QUEUE_CAPACITY,
9 DEFAULT_RECEIVER_COALESCE_WAIT_MS, DEFAULT_UDP_BATCH_SIZE,
10 HOME_GOSSIP_CHANNEL_CAPACITY, HOME_INGEST_QUEUE_CAPACITY, HOME_TVU_RECEIVE_SOCKETS,
11 HOME_UDP_BATCH_SIZE, VPS_GOSSIP_CHANNEL_CAPACITY, VPS_TVU_RECEIVE_SOCKETS,
12 },
13 model::{
14 GossipChannelTuning, GossipTuningProfile, HostProfilePreset, IngestQueueMode,
15 PendingGossipQueuePlan, ReceiverFanoutProfile, SofRuntimeTuning,
16 },
17 value_objects::{QueueCapacity, ReceiverCoalesceWindow, TvuReceiveSocketCount},
18 },
19};
20
21#[derive(Debug, Clone, Copy, Default)]
23pub struct GossipTuningService;
24
25impl GossipTuningService {
26 #[must_use]
28 pub const fn preset_profile(preset: HostProfilePreset) -> GossipTuningProfile {
29 match preset {
30 HostProfilePreset::Home => GossipTuningProfile {
31 preset,
32 runtime: SofRuntimeTuning {
33 ingest_queue_mode: IngestQueueMode::Bounded,
34 ingest_queue_capacity: QueueCapacity::fixed(HOME_INGEST_QUEUE_CAPACITY),
35 udp_batch_size: HOME_UDP_BATCH_SIZE,
36 receiver_coalesce_window: ReceiverCoalesceWindow::fixed(
37 DEFAULT_RECEIVER_COALESCE_WAIT_MS,
38 ),
39 udp_receiver_core: None,
40 udp_receiver_pin_by_port: false,
41 tvu_receive_sockets: TvuReceiveSocketCount::fixed(HOME_TVU_RECEIVE_SOCKETS),
42 },
43 channels: GossipChannelTuning {
44 gossip_receiver_channel_capacity: QueueCapacity::fixed(
45 HOME_GOSSIP_CHANNEL_CAPACITY,
46 ),
47 socket_consume_channel_capacity: QueueCapacity::fixed(
48 HOME_GOSSIP_CHANNEL_CAPACITY,
49 ),
50 gossip_response_channel_capacity: QueueCapacity::fixed(
51 HOME_GOSSIP_CHANNEL_CAPACITY,
52 ),
53 },
54 fanout: ReceiverFanoutProfile::Conservative,
55 },
56 HostProfilePreset::Vps => GossipTuningProfile {
57 preset,
58 runtime: SofRuntimeTuning {
59 ingest_queue_mode: IngestQueueMode::Lockfree,
60 ingest_queue_capacity: QueueCapacity::fixed(DEFAULT_INGEST_QUEUE_CAPACITY),
61 udp_batch_size: DEFAULT_UDP_BATCH_SIZE,
62 receiver_coalesce_window: ReceiverCoalesceWindow::fixed(
63 DEFAULT_RECEIVER_COALESCE_WAIT_MS,
64 ),
65 udp_receiver_core: None,
66 udp_receiver_pin_by_port: true,
67 tvu_receive_sockets: TvuReceiveSocketCount::fixed(VPS_TVU_RECEIVE_SOCKETS),
68 },
69 channels: GossipChannelTuning {
70 gossip_receiver_channel_capacity: QueueCapacity::fixed(
71 VPS_GOSSIP_CHANNEL_CAPACITY,
72 ),
73 socket_consume_channel_capacity: QueueCapacity::fixed(
74 HOME_GOSSIP_CHANNEL_CAPACITY,
75 ),
76 gossip_response_channel_capacity: QueueCapacity::fixed(
77 HOME_GOSSIP_CHANNEL_CAPACITY,
78 ),
79 },
80 fanout: ReceiverFanoutProfile::Balanced,
81 },
82 HostProfilePreset::Dedicated => GossipTuningProfile {
83 preset,
84 runtime: SofRuntimeTuning {
85 ingest_queue_mode: IngestQueueMode::Lockfree,
86 ingest_queue_capacity: QueueCapacity::fixed(DEFAULT_INGEST_QUEUE_CAPACITY),
87 udp_batch_size: DEFAULT_UDP_BATCH_SIZE,
88 receiver_coalesce_window: ReceiverCoalesceWindow::fixed(
89 DEFAULT_RECEIVER_COALESCE_WAIT_MS,
90 ),
91 udp_receiver_core: None,
92 udp_receiver_pin_by_port: true,
93 tvu_receive_sockets: TvuReceiveSocketCount::fixed(
94 DEDICATED_TVU_RECEIVE_SOCKETS,
95 ),
96 },
97 channels: GossipChannelTuning {
98 gossip_receiver_channel_capacity: QueueCapacity::fixed(
99 DEDICATED_GOSSIP_CHANNEL_CAPACITY,
100 ),
101 socket_consume_channel_capacity: QueueCapacity::fixed(
102 DEDICATED_SOCKET_CONSUME_CHANNEL_CAPACITY,
103 ),
104 gossip_response_channel_capacity: QueueCapacity::fixed(
105 DEDICATED_SOCKET_CONSUME_CHANNEL_CAPACITY,
106 ),
107 },
108 fanout: ReceiverFanoutProfile::Aggressive,
109 },
110 }
111 }
112
113 #[must_use]
115 pub const fn supported_runtime_tuning(profile: GossipTuningProfile) -> SofRuntimeTuning {
116 profile.runtime
117 }
118
119 #[must_use]
121 pub const fn pending_gossip_queue_plan(profile: GossipTuningProfile) -> PendingGossipQueuePlan {
122 PendingGossipQueuePlan {
123 gossip_receiver_channel_capacity: profile.channels.gossip_receiver_channel_capacity,
124 socket_consume_channel_capacity: profile.channels.socket_consume_channel_capacity,
125 gossip_response_channel_capacity: profile.channels.gossip_response_channel_capacity,
126 fanout: profile.fanout,
127 }
128 }
129
130 pub fn apply_supported_runtime_tuning<P>(profile: GossipTuningProfile, port: &mut P)
132 where
133 P: RuntimeTuningPort,
134 {
135 let runtime = Self::supported_runtime_tuning(profile);
136 Self::apply_runtime_tuning(runtime, port);
137 }
138
139 pub fn apply_runtime_tuning<P>(runtime: SofRuntimeTuning, port: &mut P)
141 where
142 P: RuntimeTuningPort,
143 {
144 port.set_ingest_queue_mode(runtime.ingest_queue_mode);
145 port.set_ingest_queue_capacity(runtime.ingest_queue_capacity);
146 port.set_udp_batch_size(runtime.udp_batch_size);
147 port.set_receiver_coalesce_window(runtime.receiver_coalesce_window);
148 port.set_udp_receiver_core(runtime.udp_receiver_core);
149 port.set_udp_receiver_pin_by_port(runtime.udp_receiver_pin_by_port);
150 port.set_tvu_receive_sockets(runtime.tvu_receive_sockets);
151 }
152}
153
154impl GossipTuningProfile {
155 #[must_use]
157 pub const fn preset(preset: HostProfilePreset) -> Self {
158 GossipTuningService::preset_profile(preset)
159 }
160
161 #[must_use]
163 pub const fn supported_runtime_tuning(self) -> SofRuntimeTuning {
164 GossipTuningService::supported_runtime_tuning(self)
165 }
166
167 #[must_use]
169 pub const fn pending_gossip_queue_plan(self) -> PendingGossipQueuePlan {
170 GossipTuningService::pending_gossip_queue_plan(self)
171 }
172}