1use std::time::Duration;
4
5use net::config::{
6 AdapterConfig, BackpressureMode, BatchConfig, EventBusConfig, EventBusConfigBuilder,
7 ScalingPolicy,
8};
9
10#[derive(Debug, Clone, Copy)]
12pub enum Backpressure {
13 DropNewest,
15 DropOldest,
17 FailProducer,
19 Sample(u32),
21}
22
23impl From<Backpressure> for BackpressureMode {
24 fn from(bp: Backpressure) -> Self {
25 match bp {
26 Backpressure::DropNewest => BackpressureMode::DropNewest,
27 Backpressure::DropOldest => BackpressureMode::DropOldest,
28 Backpressure::FailProducer => BackpressureMode::FailProducer,
29 Backpressure::Sample(rate) => BackpressureMode::Sample { rate },
30 }
31 }
32}
33
34pub struct NetBuilder {
36 pub(crate) inner: EventBusConfigBuilder,
37 pub(crate) adapter: Option<AdapterConfig>,
38 #[cfg(feature = "net")]
42 pub(crate) identity: Option<crate::identity::Identity>,
43}
44
45impl NetBuilder {
46 pub(crate) fn new() -> Self {
47 Self {
48 inner: EventBusConfig::builder(),
49 adapter: None,
50 #[cfg(feature = "net")]
51 identity: None,
52 }
53 }
54
55 #[cfg(feature = "net")]
63 pub fn identity(mut self, identity: crate::identity::Identity) -> Self {
64 self.identity = Some(identity);
65 self
66 }
67
68 pub fn shards(mut self, n: u16) -> Self {
70 self.inner = self.inner.num_shards(n);
71 self
72 }
73
74 pub fn buffer_capacity(mut self, capacity: usize) -> Self {
76 self.inner = self.inner.ring_buffer_capacity(capacity);
77 self
78 }
79
80 pub fn backpressure(mut self, bp: Backpressure) -> Self {
82 self.inner = self.inner.backpressure_mode(bp.into());
83 self
84 }
85
86 pub fn high_throughput(mut self) -> Self {
88 self.inner = self.inner.batch(BatchConfig::high_throughput());
89 self
90 }
91
92 pub fn low_latency(mut self) -> Self {
94 self.inner = self.inner.batch(BatchConfig::low_latency());
95 self
96 }
97
98 pub fn batch(mut self, batch: BatchConfig) -> Self {
100 self.inner = self.inner.batch(batch);
101 self
102 }
103
104 pub fn scaling(mut self, policy: ScalingPolicy) -> Self {
106 self.inner = self.inner.scaling(policy);
107 self
108 }
109
110 pub fn adapter_timeout(mut self, timeout: Duration) -> Self {
112 self.inner = self.inner.adapter_timeout(timeout);
113 self
114 }
115
116 pub fn memory(mut self) -> Self {
118 self.adapter = Some(AdapterConfig::Noop);
119 self
120 }
121
122 #[cfg(feature = "redis")]
124 pub fn redis(mut self, config: net::config::RedisAdapterConfig) -> Self {
125 self.adapter = Some(AdapterConfig::Redis(config));
126 self
127 }
128
129 #[cfg(feature = "jetstream")]
131 pub fn jetstream(mut self, config: net::config::JetStreamAdapterConfig) -> Self {
132 self.adapter = Some(AdapterConfig::JetStream(config));
133 self
134 }
135
136 #[cfg(feature = "net")]
138 pub fn mesh(mut self, config: net::adapter::net::NetAdapterConfig) -> Self {
139 self.adapter = Some(AdapterConfig::Net(Box::new(config)));
140 self
141 }
142
143 pub(crate) fn build_config(mut self) -> crate::error::Result<EventBusConfig> {
145 #[cfg_attr(not(feature = "net"), allow(unused_mut))]
151 if let Some(mut adapter) = self.adapter {
152 #[cfg(feature = "net")]
174 if let (Some(id), AdapterConfig::Net(ref mut net_cfg)) =
175 (self.identity.as_ref(), &mut adapter)
176 {
177 let builder_kp = id.keypair();
178 if let Some(adapter_kp) = net_cfg.entity_keypair.as_ref() {
179 if adapter_kp.entity_id() != builder_kp.entity_id() {
180 return Err(crate::error::SdkError::Config(format!(
181 "conflicting identities: NetAdapterConfig::with_entity_keypair \
182 pinned entity_id {} but NetBuilder::identity pinned {}. \
183 Set the keypair on exactly one of them.",
184 adapter_kp.entity_id(),
185 builder_kp.entity_id(),
186 )));
187 }
188 } else {
190 net_cfg.entity_keypair = Some((**builder_kp).clone());
191 }
192 }
193 self.inner = self.inner.adapter(adapter);
194 }
195 self.inner
196 .build()
197 .map_err(|e| crate::error::SdkError::Config(e.to_string()))
198 }
199}
200
201#[cfg(all(test, feature = "net"))]
202mod tests {
203 use super::*;
204 use net::adapter::net::NetAdapterConfig;
205 use std::net::SocketAddr;
206
207 fn sample_net_adapter() -> NetAdapterConfig {
208 let bind: SocketAddr = "127.0.0.1:0".parse().unwrap();
209 let peer: SocketAddr = "127.0.0.1:1".parse().unwrap();
210 NetAdapterConfig::initiator(bind, peer, [0x00; 32], [0x00; 32])
211 }
212
213 #[test]
225 fn net_builder_identity_plumbs_into_adapter() {
226 let seed = [0x42u8; 32];
227 let identity = crate::identity::Identity::from_seed(seed);
228 let expected_entity_id = identity.entity_id().clone();
229
230 let config = NetBuilder::new()
231 .mesh(sample_net_adapter())
232 .identity(identity)
233 .build_config()
234 .expect("build_config");
235
236 let AdapterConfig::Net(net_cfg) = config.adapter else {
237 panic!("adapter lost its Net variant during build_config");
238 };
239 let kp = net_cfg
240 .entity_keypair
241 .as_ref()
242 .expect("entity_keypair should have been plumbed from NetBuilder::identity");
243 assert_eq!(
244 kp.entity_id(),
245 &expected_entity_id,
246 "plumbed entity_id must match the pinned identity's",
247 );
248 }
249
250 #[test]
256 fn net_builder_without_identity_leaves_adapter_keypair_unset() {
257 let config = NetBuilder::new()
258 .mesh(sample_net_adapter())
259 .build_config()
260 .expect("build_config");
261
262 let AdapterConfig::Net(net_cfg) = config.adapter else {
263 panic!("adapter lost its Net variant");
264 };
265 assert!(
266 net_cfg.entity_keypair.is_none(),
267 "entity_keypair must stay unset when the builder has no identity",
268 );
269 }
270
271 #[test]
278 fn build_config_errors_on_conflicting_identities() {
279 let kp_a = net::adapter::net::identity::EntityKeypair::generate();
280 let kp_b_seed = [0xAAu8; 32];
281 let identity_b = crate::identity::Identity::from_seed(kp_b_seed);
282 assert_ne!(kp_a.entity_id(), identity_b.entity_id());
284
285 let mesh_cfg = sample_net_adapter().with_entity_keypair(kp_a);
286
287 let err = NetBuilder::new()
288 .mesh(mesh_cfg)
289 .identity(identity_b)
290 .build_config()
291 .expect_err(
292 "build_config must reject conflicting keypairs; \
293 pre-fix it silently let identity() win",
294 );
295
296 let msg = format!("{}", err);
297 assert!(
298 msg.contains("conflicting identities"),
299 "expected 'conflicting identities' in error, got: {}",
300 msg
301 );
302 }
303
304 #[test]
308 fn build_config_accepts_matching_identities_on_both_sides() {
309 let seed = [0x11u8; 32];
310 let identity = crate::identity::Identity::from_seed(seed);
311 let same_kp = (**identity.keypair()).clone();
312 let expected_id = identity.entity_id().clone();
313
314 let mesh_cfg = sample_net_adapter().with_entity_keypair(same_kp);
315
316 let config = NetBuilder::new()
317 .mesh(mesh_cfg)
318 .identity(identity)
319 .build_config()
320 .expect("matching keypairs on both sides must build");
321
322 let AdapterConfig::Net(net_cfg) = config.adapter else {
323 panic!("adapter lost its Net variant");
324 };
325 assert_eq!(
326 net_cfg.entity_keypair.as_ref().unwrap().entity_id(),
327 &expected_id,
328 );
329 }
330
331 #[test]
335 fn build_config_preserves_adapter_keypair_when_no_builder_identity() {
336 let kp = net::adapter::net::identity::EntityKeypair::generate();
337 let expected_id = kp.entity_id().clone();
338 let mesh_cfg = sample_net_adapter().with_entity_keypair(kp);
339
340 let config = NetBuilder::new()
341 .mesh(mesh_cfg)
342 .build_config()
343 .expect("build_config");
344
345 let AdapterConfig::Net(net_cfg) = config.adapter else {
346 panic!("adapter lost its Net variant");
347 };
348 assert_eq!(
349 net_cfg.entity_keypair.as_ref().unwrap().entity_id(),
350 &expected_id,
351 "adapter-side entity_keypair must survive when builder has no identity",
352 );
353 }
354}