1use super::*;
2use rns_core::transport::{InboundFrame, RxMetadata};
3
4impl Driver {
5 pub(crate) fn handle_frame_event(
6 &mut self,
7 interface_id: InterfaceId,
8 data: Vec<u8>,
9 rssi: Option<i16>,
10 snr: Option<f32>,
11 ) {
12 if data.len() > 2 && (data[0] & 0x03) == 0x01 {
13 log::debug!(
14 "Announce:frame from iface {} (len={}, flags=0x{:02x})",
15 interface_id.0,
16 data.len(),
17 data[0]
18 );
19 }
20 if let Some(entry) = self.interfaces.get(&interface_id) {
21 if !entry.enabled || !entry.online {
22 return;
23 }
24 }
25 if let Some(entry) = self.interfaces.get_mut(&interface_id) {
26 entry.stats.rxb += data.len() as u64;
27 entry.stats.rx_packets += 1;
28 }
29
30 let packet = if let Some(entry) = self.interfaces.get(&interface_id) {
31 if let Some(ref ifac_state) = entry.ifac {
32 match ifac::unmask_inbound(&data, ifac_state) {
33 Some(unmasked) => unmasked,
34 None => {
35 log::debug!("[{}] IFAC rejected packet", interface_id.0);
36 return;
37 }
38 }
39 } else {
40 if data.len() > 2 && data[0] & 0x80 == 0x80 {
41 log::debug!(
42 "[{}] dropping packet with IFAC flag on non-IFAC interface",
43 interface_id.0
44 );
45 return;
46 }
47 data
48 }
49 } else {
50 data
51 };
52
53 #[cfg(feature = "hooks")]
54 {
55 let pkt_ctx = rns_hooks::PacketContext {
56 flags: if packet.is_empty() { 0 } else { packet[0] },
57 hops: if packet.len() > 1 { packet[1] } else { 0 },
58 destination_hash: extract_dest_hash(&packet),
59 context: 0,
60 packet_hash: [0; 32],
61 interface_id: interface_id.0,
62 data_offset: 0,
63 data_len: packet.len() as u32,
64 };
65 let ctx = HookContext::Packet {
66 ctx: &pkt_ctx,
67 raw: &packet,
68 };
69 let now = time::now();
70 let engine_ref = EngineRef {
71 engine: &self.engine,
72 interfaces: &self.interfaces,
73 link_manager: &self.link_manager,
74 now,
75 };
76 let provider_events_enabled = self.provider_events_enabled();
77 if let Some(ref e) = run_hook_inner(
78 &mut self.hook_slots[HookPoint::PreIngress as usize].programs,
79 &self.hook_manager,
80 &engine_ref,
81 &ctx,
82 now,
83 provider_events_enabled,
84 ) {
85 self.forward_hook_side_effects("PreIngress", e);
86 if e.hook_result.as_ref().is_some_and(|r| r.is_drop()) {
87 return;
88 }
89 }
90 }
91
92 if packet.len() > 2 && (packet[0] & 0x03) == 0x01 {
93 let now = time::now();
94 if let Some(entry) = self.interfaces.get_mut(&interface_id) {
95 entry.stats.record_incoming_announce(now);
96 }
97 }
98
99 if let Some(entry) = self.interfaces.get(&interface_id) {
100 self.engine.update_interface_freqs(
101 interface_id,
102 entry.stats.incoming_announce_freq(),
103 entry.stats.incoming_path_request_freq(),
104 entry.stats.outgoing_path_request_freq(),
105 entry.stats.outgoing_path_request_samples(),
106 );
107 }
108
109 let inbound_frame = InboundFrame {
110 raw: &packet,
111 iface: interface_id,
112 now: time::now(),
113 rx: RxMetadata { rssi, snr },
114 };
115
116 let actions = if self.async_announce_verification {
117 let mut announce_queue = self
118 .announce_verify_queue
119 .lock()
120 .unwrap_or_else(|poisoned| poisoned.into_inner());
121 self.engine.handle_inbound_with_announce_queue(
122 inbound_frame,
123 &mut self.rng,
124 Some(&mut announce_queue),
125 )
126 } else {
127 self.engine.handle_inbound(inbound_frame, &mut self.rng)
128 };
129
130 #[cfg(feature = "hooks")]
131 {
132 let pkt_ctx = rns_hooks::PacketContext {
133 flags: if packet.is_empty() { 0 } else { packet[0] },
134 hops: if packet.len() > 1 { packet[1] } else { 0 },
135 destination_hash: extract_dest_hash(&packet),
136 context: 0,
137 packet_hash: [0; 32],
138 interface_id: interface_id.0,
139 data_offset: 0,
140 data_len: packet.len() as u32,
141 };
142 let ctx = HookContext::Packet {
143 ctx: &pkt_ctx,
144 raw: &packet,
145 };
146 let now = time::now();
147 let engine_ref = EngineRef {
148 engine: &self.engine,
149 interfaces: &self.interfaces,
150 link_manager: &self.link_manager,
151 now,
152 };
153 let provider_events_enabled = self.provider_events_enabled();
154 if let Some(ref e) = run_hook_inner(
155 &mut self.hook_slots[HookPoint::PreDispatch as usize].programs,
156 &self.hook_manager,
157 &engine_ref,
158 &ctx,
159 now,
160 provider_events_enabled,
161 ) {
162 self.forward_hook_side_effects("PreDispatch", e);
163 }
164 }
165
166 self.dispatch_all(actions);
167 }
168
169 pub(crate) fn handle_announce_verified_event(
170 &mut self,
171 key: rns_core::transport::announce_verify_queue::AnnounceVerifyKey,
172 validated: rns_core::announce::ValidatedAnnounce,
173 sig_cache_key: [u8; 32],
174 ) {
175 let pending = {
176 let mut announce_queue = self
177 .announce_verify_queue
178 .lock()
179 .unwrap_or_else(|poisoned| poisoned.into_inner());
180 announce_queue.complete_success(&key)
181 };
182 if let Some(pending) = pending {
183 let actions = self.engine.complete_verified_announce(
184 pending,
185 validated,
186 sig_cache_key,
187 time::now(),
188 &mut self.rng,
189 );
190 self.dispatch_all(actions);
191 }
192 }
193
194 pub(crate) fn handle_tick_event(&mut self) {
195 #[cfg(feature = "hooks")]
196 {
197 let ctx = HookContext::Tick;
198 let now = time::now();
199 let engine_ref = EngineRef {
200 engine: &self.engine,
201 interfaces: &self.interfaces,
202 link_manager: &self.link_manager,
203 now,
204 };
205 let provider_events_enabled = self.provider_events_enabled();
206 if let Some(ref e) = run_hook_inner(
207 &mut self.hook_slots[HookPoint::Tick as usize].programs,
208 &self.hook_manager,
209 &engine_ref,
210 &ctx,
211 now,
212 provider_events_enabled,
213 ) {
214 self.forward_hook_side_effects("Tick", e);
215 }
216 }
217
218 let now = time::now();
219 for (id, entry) in &self.interfaces {
220 self.engine.update_interface_freqs(
221 *id,
222 entry.stats.incoming_announce_freq(),
223 entry.stats.incoming_path_request_freq(),
224 entry.stats.outgoing_path_request_freq(),
225 entry.stats.outgoing_path_request_samples(),
226 );
227 }
228 let actions = self.engine.tick(now, &mut self.rng);
229 self.dispatch_all(actions);
230 let link_actions = self.link_manager.tick(&mut self.rng);
231 self.dispatch_link_actions(link_actions);
232 self.enforce_drain_deadline();
233 {
234 let tx = self.get_event_sender();
235 let hp_actions = self.holepunch_manager.tick(&tx);
236 self.dispatch_holepunch_actions(hp_actions);
237 }
238 self.tick_management_announces(now);
239 self.sent_packets
240 .retain(|_, (_, sent_time)| now - *sent_time < 60.0);
241 self.completed_proofs
242 .retain(|_, (_, received)| now - *received < 120.0);
243
244 self.tick_discovery_announcer(now);
245 #[cfg(feature = "iface-backbone")]
246 self.maintain_backbone_peer_pool();
247
248 self.memory_stats_counter += 1;
249 if self.memory_stats_counter >= 300 {
250 self.memory_stats_counter = 0;
251 self.log_memory_stats();
252 }
253
254 if self.discover_interfaces {
255 self.discovery_cleanup_counter += 1;
256 if self.discovery_cleanup_counter >= self.discovery_cleanup_interval_ticks {
257 self.discovery_cleanup_counter = 0;
258 if let Ok(removed) = self.discovered_interfaces.cleanup() {
259 if removed > 0 {
260 log::info!("Discovery cleanup: removed {} stale entries", removed);
261 }
262 #[cfg(feature = "iface-backbone")]
263 self.cull_stale_discovered_backbone_peer_pool_candidates();
264 }
265 }
266 }
267
268 self.cache_cleanup_counter += 1;
269 if self.cache_cleanup_counter >= self.known_destinations_cleanup_interval_ticks {
270 self.cache_cleanup_counter = 0;
271
272 let active_dests = self.engine.active_destination_hashes();
273 let ttl = self.known_destinations_ttl;
274 let kd_before = self.known_destinations.len();
275 self.known_destinations.retain(|k, state| {
276 active_dests.contains(k)
277 || self.local_destinations.contains_key(k)
278 || state.retained
279 || now - Self::known_destination_relevance_time(state) < ttl
280 });
281 let kd_removed = kd_before - self.known_destinations.len();
282 let kd_evicted = self.enforce_known_destination_cap(false);
283 let rl_removed =
284 self.engine
285 .cull_rate_limiter(&active_dests, now, self.rate_limiter_ttl_secs);
286
287 if kd_removed > 0 || kd_evicted > 0 || rl_removed > 0 {
288 log::info!(
289 "Memory cleanup: removed {} known_destinations, evicted {} known_destinations, {} rate_limiter entries",
290 kd_removed, kd_evicted, rl_removed
291 );
292 }
293 }
294
295 self.announce_cache_cleanup_counter += 1;
296 if self.announce_cache_cleanup_counter >= self.announce_cache_cleanup_interval_ticks {
297 self.announce_cache_cleanup_counter = 0;
298 if self.announce_cache.is_some() && self.cache_cleanup_active_hashes.is_none() {
299 self.cache_cleanup_active_hashes = Some(self.engine.active_packet_hashes());
300 self.cache_cleanup_entries = None;
301 self.cache_cleanup_removed = 0;
302 }
303 }
304
305 if self.cache_cleanup_active_hashes.is_some() {
306 if let Some(ref cache) = self.announce_cache {
307 if self.cache_cleanup_entries.is_none() {
308 match cache.entries() {
309 Ok(entries) => self.cache_cleanup_entries = Some(entries),
310 Err(e) => {
311 log::warn!("Announce cache cleanup failed to open directory: {}", e);
312 self.cache_cleanup_active_hashes = None;
313 self.cache_cleanup_entries = None;
314 }
315 }
316 }
317 }
318
319 if let Some(ref cache) = self.announce_cache {
320 let Some(active_hashes) = self.cache_cleanup_active_hashes.as_ref() else {
321 self.cache_cleanup_entries = None;
322 return;
323 };
324 let entries = match self.cache_cleanup_entries.as_mut() {
325 Some(entries) => entries,
326 None => return,
327 };
328 match cache.clean_batch(
329 active_hashes,
330 entries,
331 self.announce_cache_cleanup_batch_size,
332 ) {
333 Ok((removed, finished)) => {
334 self.cache_cleanup_removed += removed;
335 if finished {
336 if self.cache_cleanup_removed > 0 {
337 log::info!(
338 "Announce cache cleanup complete: removed {} stale files",
339 self.cache_cleanup_removed
340 );
341 }
342 self.cache_cleanup_active_hashes = None;
343 self.cache_cleanup_entries = None;
344 }
345 }
346 Err(e) => {
347 log::warn!("Announce cache cleanup failed: {}", e);
348 self.cache_cleanup_active_hashes = None;
349 self.cache_cleanup_entries = None;
350 }
351 }
352 } else {
353 self.cache_cleanup_active_hashes = None;
354 self.cache_cleanup_entries = None;
355 }
356 }
357 }
358
359 pub(crate) fn handle_interface_up_event(
360 &mut self,
361 id: InterfaceId,
362 new_writer: Option<Box<dyn crate::interface::Writer>>,
363 info: Option<rns_core::transport::types::InterfaceInfo>,
364 ) {
365 let wants_tunnel;
366 let mut replay_shared_announces = false;
367 if let Some(mut info) = info {
368 log::info!("[{}] dynamic interface registered", id.0);
369 self.apply_announce_rate_defaults(&mut info);
370 self.apply_ingress_control_defaults(&mut info);
371 wants_tunnel = info.wants_tunnel;
372 let iface_type = infer_interface_type(&info.name);
373 info.started = time::now();
374 self.register_interface_runtime_defaults(&info);
375 self.engine.register_interface(info.clone());
376 if let Some(writer) = new_writer {
377 let (writer, async_writer_metrics) =
378 self.wrap_interface_writer(id, &info.name, writer);
379 self.interfaces.insert(
380 id,
381 InterfaceEntry {
382 id,
383 info,
384 writer,
385 async_writer_metrics: Some(async_writer_metrics),
386 enabled: true,
387 online: true,
388 dynamic: true,
389 ifac: None,
390 stats: InterfaceStats {
391 started: time::now(),
392 ..Default::default()
393 },
394 interface_type: iface_type,
395 send_retry_at: None,
396 send_retry_backoff: Duration::ZERO,
397 },
398 );
399 }
400 self.callbacks.on_interface_up(id);
401 #[cfg(feature = "hooks")]
402 {
403 let ctx = HookContext::Interface { interface_id: id.0 };
404 let now = time::now();
405 let engine_ref = EngineRef {
406 engine: &self.engine,
407 interfaces: &self.interfaces,
408 link_manager: &self.link_manager,
409 now,
410 };
411 let provider_events_enabled = self.provider_events_enabled();
412 if let Some(ref e) = run_hook_inner(
413 &mut self.hook_slots[HookPoint::InterfaceUp as usize].programs,
414 &self.hook_manager,
415 &engine_ref,
416 &ctx,
417 now,
418 provider_events_enabled,
419 ) {
420 self.forward_hook_side_effects("InterfaceUp", e);
421 }
422 }
423 } else {
424 let is_local_client = self
425 .interfaces
426 .get(&id)
427 .map(|entry| entry.info.is_local_client)
428 .unwrap_or(false);
429 replay_shared_announces =
430 is_local_client && self.shared_reconnect_pending.remove(&id).unwrap_or(false);
431 let interface_name = self
432 .interfaces
433 .get(&id)
434 .map(|entry| entry.info.name.clone())
435 .unwrap_or_else(|| format!("iface-{}", id.0));
436 let wrapped_writer =
437 new_writer.map(|writer| self.wrap_interface_writer(id, &interface_name, writer));
438 if let Some(entry) = self.interfaces.get_mut(&id) {
439 log::info!("[{}] interface online", id.0);
440 wants_tunnel = entry.info.wants_tunnel;
441 entry.online = true;
442 if let Some((writer, async_writer_metrics)) = wrapped_writer {
443 log::info!("[{}] writer refreshed after reconnect", id.0);
444 entry.writer = writer;
445 entry.async_writer_metrics = Some(async_writer_metrics);
446 }
447 self.callbacks.on_interface_up(id);
448 #[cfg(feature = "hooks")]
449 {
450 let ctx = HookContext::Interface { interface_id: id.0 };
451 let now = time::now();
452 let engine_ref = EngineRef {
453 engine: &self.engine,
454 interfaces: &self.interfaces,
455 link_manager: &self.link_manager,
456 now,
457 };
458 let provider_events_enabled = self.provider_events_enabled();
459 if let Some(ref e) = run_hook_inner(
460 &mut self.hook_slots[HookPoint::InterfaceUp as usize].programs,
461 &self.hook_manager,
462 &engine_ref,
463 &ctx,
464 now,
465 provider_events_enabled,
466 ) {
467 self.forward_hook_side_effects("InterfaceUp", e);
468 }
469 }
470 } else {
471 wants_tunnel = false;
472 }
473 }
474
475 if wants_tunnel {
476 self.synthesize_tunnel_for_interface(id);
477 }
478 if replay_shared_announces {
479 self.replay_shared_announces();
480 }
481 }
482
483 pub(crate) fn handle_interface_down_event(&mut self, id: InterfaceId) {
484 if let Some(entry) = self.interfaces.get(&id) {
485 if let Some(tunnel_id) = entry.info.tunnel_id {
486 self.engine.void_tunnel_interface(&tunnel_id);
487 }
488 }
489
490 if let Some(entry) = self.interfaces.get(&id) {
491 let is_dynamic = entry.dynamic;
492 let is_local_client = entry.info.is_local_client;
493 let interface_name = entry.info.name.clone();
494 if is_dynamic {
495 log::info!("[{}] dynamic interface removed", id.0);
496 self.interface_runtime_defaults.remove(&interface_name);
497 self.engine.deregister_interface(id);
498 self.interfaces.remove(&id);
499 } else {
500 log::info!("[{}] interface offline", id.0);
501 if let Some(entry) = self.interfaces.get_mut(&id) {
502 entry.online = false;
503 } else {
504 log::warn!(
505 "interface {} disappeared while handling interface-down",
506 id.0
507 );
508 return;
509 }
510 if is_local_client {
511 self.handle_shared_interface_down(id);
512 }
513 }
514 self.callbacks.on_interface_down(id);
515 #[cfg(feature = "hooks")]
516 {
517 let ctx = HookContext::Interface { interface_id: id.0 };
518 let now = time::now();
519 let engine_ref = EngineRef {
520 engine: &self.engine,
521 interfaces: &self.interfaces,
522 link_manager: &self.link_manager,
523 now,
524 };
525 let provider_events_enabled = self.provider_events_enabled();
526 if let Some(ref e) = run_hook_inner(
527 &mut self.hook_slots[HookPoint::InterfaceDown as usize].programs,
528 &self.hook_manager,
529 &engine_ref,
530 &ctx,
531 now,
532 provider_events_enabled,
533 ) {
534 self.forward_hook_side_effects("InterfaceDown", e);
535 }
536 }
537 }
538 #[cfg(feature = "iface-backbone")]
539 self.handle_backbone_peer_pool_down(id);
540 }
541
542 pub(crate) fn known_destination_route_hint(
543 &self,
544 dest_hash: &[u8; 16],
545 ) -> Option<(InterfaceId, u8)> {
546 let announced = &self.known_destinations.get(dest_hash)?.announced;
547 let iface = announced.receiving_interface;
548 if iface.0 == 0 {
549 return None;
550 }
551
552 self.interfaces
553 .get(&iface)
554 .filter(|entry| entry.online)
555 .map(|_| (iface, announced.hops))
556 }
557
558 pub(crate) fn handle_send_outbound_event(
559 &mut self,
560 raw: Vec<u8>,
561 dest_type: u8,
562 attached_interface: Option<InterfaceId>,
563 ) {
564 if self.is_draining() {
565 self.reject_new_work("send outbound packet");
566 return;
567 }
568 match RawPacket::unpack(&raw) {
569 Ok(packet) => {
570 let is_announce =
571 packet.flags.packet_type == rns_core::constants::PACKET_TYPE_ANNOUNCE;
572 if is_announce {
573 log::debug!(
574 "SendOutbound: ANNOUNCE for {:02x?} (len={}, dest_type={}, attached={:?})",
575 &packet.destination_hash[..4],
576 raw.len(),
577 dest_type,
578 attached_interface
579 );
580 }
581 if packet.flags.packet_type == rns_core::constants::PACKET_TYPE_DATA {
582 self.sent_packets
583 .insert(packet.packet_hash, (packet.destination_hash, time::now()));
584 }
585 let actions = self.engine.handle_outbound(
586 &packet,
587 dest_type,
588 attached_interface,
589 time::now(),
590 );
591 if is_announce {
592 log::debug!(
593 "SendOutbound: announce routed to {} actions: {:?}",
594 actions.len(),
595 actions
596 .iter()
597 .map(|a| match a {
598 TransportAction::SendOnInterface { interface, .. } =>
599 format!("SendOn({})", interface.0),
600 TransportAction::BroadcastOnAllInterfaces { .. } =>
601 "BroadcastAll".to_string(),
602 _ => "other".to_string(),
603 })
604 .collect::<Vec<_>>()
605 );
606 }
607 self.dispatch_all(actions);
608 }
609 Err(e) => {
610 log::warn!("SendOutbound: failed to unpack packet: {:?}", e);
611 }
612 }
613 }
614
615 pub fn run(&mut self) {
617 loop {
618 let event = match self.rx.recv() {
619 Ok(e) => e,
620 Err(_) => break, };
622
623 match event {
624 Event::Frame {
625 interface_id,
626 data,
627 rssi,
628 snr,
629 } => {
630 self.handle_frame_event(interface_id, data, rssi, snr);
631 }
632 Event::AnnounceVerified {
633 key,
634 validated,
635 sig_cache_key,
636 } => {
637 self.handle_announce_verified_event(key, validated, sig_cache_key);
638 }
639 Event::AnnounceVerifyFailed { key, .. } => {
640 let mut announce_queue = self
641 .announce_verify_queue
642 .lock()
643 .unwrap_or_else(|poisoned| poisoned.into_inner());
644 let _ = announce_queue.complete_failure(&key);
645 }
646 Event::Tick => self.handle_tick_event(),
647 Event::BeginDrain { timeout } => {
648 self.begin_drain(timeout);
649 }
650 Event::InterfaceUp(id, new_writer, info) => {
651 self.handle_interface_up_event(id, new_writer, info);
652 }
653 Event::InterfaceDown(id) => self.handle_interface_down_event(id),
654 Event::SendOutbound {
655 raw,
656 dest_type,
657 attached_interface,
658 } => self.handle_send_outbound_event(raw, dest_type, attached_interface),
659 Event::RegisterDestination {
660 dest_hash,
661 dest_type,
662 } => {
663 self.engine.register_destination(dest_hash, dest_type);
664 self.local_destinations.insert(dest_hash, dest_type);
665 }
666 Event::StoreSharedAnnounce {
667 dest_hash,
668 name_hash,
669 identity_prv_key,
670 app_data,
671 } => {
672 self.shared_announces.insert(
673 dest_hash,
674 SharedAnnounceRecord {
675 name_hash,
676 identity_prv_key,
677 app_data,
678 },
679 );
680 }
681 Event::DeregisterDestination { dest_hash } => {
682 self.engine.deregister_destination(&dest_hash);
683 self.local_destinations.remove(&dest_hash);
684 self.shared_announces.remove(&dest_hash);
685 }
686 Event::Query(request, response_tx) => {
687 let response = self.handle_query_mut(request);
688 let _ = response_tx.send(response);
689 }
690 Event::DeregisterLinkDestination { dest_hash } => {
691 self.link_manager.deregister_link_destination(&dest_hash);
692 }
693 Event::RegisterLinkDestination {
694 dest_hash,
695 sig_prv_bytes,
696 sig_pub_bytes,
697 resource_strategy,
698 } => {
699 let sig_prv =
700 rns_crypto::ed25519::Ed25519PrivateKey::from_bytes(&sig_prv_bytes);
701 let strat = match resource_strategy {
702 1 => crate::link_manager::ResourceStrategy::AcceptAll,
703 2 => crate::link_manager::ResourceStrategy::AcceptApp,
704 _ => crate::link_manager::ResourceStrategy::AcceptNone,
705 };
706 self.link_manager.register_link_destination(
707 dest_hash,
708 sig_prv,
709 sig_pub_bytes,
710 strat,
711 );
712 self.engine
714 .register_destination(dest_hash, rns_core::constants::DESTINATION_SINGLE);
715 self.local_destinations
716 .insert(dest_hash, rns_core::constants::DESTINATION_SINGLE);
717 }
718 Event::RegisterRequestHandler {
719 path,
720 allowed_list,
721 handler,
722 } => {
723 self.link_manager.register_request_handler(
724 &path,
725 allowed_list,
726 move |link_id, p, data, remote| handler(link_id, p, data, remote),
727 );
728 }
729 Event::RegisterRequestHandlerResponse {
730 path,
731 allowed_list,
732 handler,
733 } => {
734 self.link_manager.register_request_handler_response(
735 &path,
736 allowed_list,
737 move |link_id, p, data, remote| handler(link_id, p, data, remote),
738 );
739 }
740 Event::CreateLink {
741 dest_hash,
742 dest_sig_pub_bytes,
743 response_tx,
744 } => {
745 if self.is_draining() {
746 self.reject_new_work("create link");
747 let _ = (dest_hash, dest_sig_pub_bytes);
748 let _ = response_tx.send([0u8; 16]);
749 continue;
750 }
751 let next_hop_interface = self.engine.next_hop_interface(&dest_hash);
752 let recalled_route_hint = if next_hop_interface.is_none() {
753 self.known_destination_route_hint(&dest_hash)
754 } else {
755 None
756 };
757 if recalled_route_hint.is_some() {
758 let _ = self.mark_known_destination_used(&dest_hash);
759 }
760 let attached_interface =
761 next_hop_interface.or(recalled_route_hint.map(|(iface, _)| iface));
762 let hops = self
763 .engine
764 .hops_to(&dest_hash)
765 .or_else(|| recalled_route_hint.map(|(_, hops)| hops))
766 .unwrap_or(0);
767 let mtu = attached_interface
768 .and_then(|iface_id| self.interfaces.get(&iface_id))
769 .map(|entry| entry.info.mtu)
770 .unwrap_or(rns_core::constants::MTU as u32);
771 let (link_id, mut link_actions) = self.link_manager.create_link(
772 &dest_hash,
773 &dest_sig_pub_bytes,
774 hops,
775 mtu,
776 &mut self.rng,
777 );
778 if let Some(iface) = attached_interface {
779 self.link_manager.set_link_route_hint(&link_id, iface, None);
780 }
781 if next_hop_interface.is_none() {
782 if let Some(iface) = attached_interface {
783 for action in &mut link_actions {
784 if let LinkManagerAction::SendPacket {
785 dest_type,
786 attached_interface,
787 ..
788 } = action
789 {
790 if *dest_type == rns_core::constants::DESTINATION_LINK
791 && attached_interface.is_none()
792 {
793 *attached_interface = Some(iface);
794 }
795 }
796 }
797 }
798 }
799 let _ = response_tx.send(link_id);
800 self.dispatch_link_actions(link_actions);
801 }
802 Event::SendRequest {
803 link_id,
804 path,
805 data,
806 } => {
807 if self.is_draining() {
808 self.reject_new_work("send link request");
809 let _ = (link_id, path, data);
810 continue;
811 }
812 let link_actions =
813 self.link_manager
814 .send_request(&link_id, &path, &data, &mut self.rng);
815 self.dispatch_link_actions(link_actions);
816 }
817 Event::IdentifyOnLink {
818 link_id,
819 identity_prv_key,
820 } => {
821 if self.is_draining() {
822 self.reject_new_work("identify on link");
823 let _ = (link_id, identity_prv_key);
824 continue;
825 }
826 let identity =
827 rns_crypto::identity::Identity::from_private_key(&identity_prv_key);
828 let link_actions =
829 self.link_manager
830 .identify(&link_id, &identity, &mut self.rng);
831 self.dispatch_link_actions(link_actions);
832 }
833 Event::TeardownLink { link_id } => {
834 let link_actions = self.link_manager.teardown_link(&link_id);
835 self.dispatch_link_actions(link_actions);
836 }
837 Event::SendResource {
838 link_id,
839 data,
840 metadata,
841 auto_compress,
842 } => {
843 if self.is_draining() {
844 self.reject_new_work("send resource");
845 let _ = (link_id, data, metadata, auto_compress);
846 continue;
847 }
848 let link_actions = self.link_manager.send_resource_with_auto_compress(
849 &link_id,
850 &data,
851 metadata.as_deref(),
852 auto_compress,
853 &mut self.rng,
854 );
855 self.dispatch_link_actions(link_actions);
856 }
857 Event::SetResourceStrategy { link_id, strategy } => {
858 use crate::link_manager::ResourceStrategy;
859 let strat = match strategy {
860 0 => ResourceStrategy::AcceptNone,
861 1 => ResourceStrategy::AcceptAll,
862 2 => ResourceStrategy::AcceptApp,
863 _ => ResourceStrategy::AcceptNone,
864 };
865 self.link_manager.set_resource_strategy(&link_id, strat);
866 }
867 Event::AcceptResource {
868 link_id,
869 resource_hash,
870 accept,
871 } => {
872 if self.is_draining() && accept {
873 self.reject_new_work("accept resource");
874 let _ = (link_id, resource_hash, accept);
875 continue;
876 }
877 let link_actions = self.link_manager.accept_resource(
878 &link_id,
879 &resource_hash,
880 accept,
881 &mut self.rng,
882 );
883 self.dispatch_link_actions(link_actions);
884 }
885 Event::SendChannelMessage {
886 link_id,
887 msgtype,
888 payload,
889 response_tx,
890 } => {
891 if self.is_draining() {
892 self.reject_new_work("send channel message");
893 let _ = response_tx.send(Err(self.drain_error("send channel message")));
894 continue;
895 }
896 match self.link_manager.send_channel_message(
897 &link_id,
898 msgtype,
899 &payload,
900 &mut self.rng,
901 ) {
902 Ok(link_actions) => {
903 self.dispatch_link_actions(link_actions);
904 let _ = response_tx.send(Ok(()));
905 }
906 Err(err) => {
907 let _ = response_tx.send(Err(err));
908 }
909 }
910 }
911 Event::SendOnLink {
912 link_id,
913 data,
914 context,
915 } => {
916 if self.is_draining() {
917 self.reject_new_work("send link payload");
918 let _ = (link_id, data, context);
919 continue;
920 }
921 let link_actions =
922 self.link_manager
923 .send_on_link(&link_id, &data, context, &mut self.rng);
924 self.dispatch_link_actions(link_actions);
925 }
926 Event::RequestPath { dest_hash } => {
927 if self.is_draining() {
928 self.reject_new_work("request path");
929 let _ = dest_hash;
930 continue;
931 }
932 self.handle_request_path(dest_hash);
933 }
934 Event::RegisterProofStrategy {
935 dest_hash,
936 strategy,
937 signing_key,
938 } => {
939 let identity = signing_key
940 .map(|key| rns_crypto::identity::Identity::from_private_key(&key));
941 self.proof_strategies
942 .insert(dest_hash, (strategy, identity));
943 }
944 Event::ProposeDirectConnect { link_id } => {
945 if self.is_draining() {
946 self.reject_new_work("propose direct connect");
947 let _ = link_id;
948 continue;
949 }
950 let derived_key = self.link_manager.get_derived_key(&link_id);
951 if let Some(dk) = derived_key {
952 let tx = self.get_event_sender();
953 let hp_actions =
954 self.holepunch_manager
955 .propose(link_id, &dk, &mut self.rng, &tx);
956 self.dispatch_holepunch_actions(hp_actions);
957 } else {
958 log::warn!(
959 "Cannot propose direct connect: no derived key for link {:02x?}",
960 &link_id[..4]
961 );
962 }
963 }
964 Event::SetDirectConnectPolicy { policy } => {
965 self.holepunch_manager.set_policy(policy);
966 }
967 Event::HolePunchProbeResult {
968 link_id,
969 session_id,
970 observed_addr,
971 socket,
972 probe_server,
973 } => {
974 let hp_actions = self.holepunch_manager.handle_probe_result(
975 link_id,
976 session_id,
977 observed_addr,
978 socket,
979 probe_server,
980 );
981 self.dispatch_holepunch_actions(hp_actions);
982 }
983 Event::HolePunchProbeFailed {
984 link_id,
985 session_id,
986 } => {
987 let hp_actions = self
988 .holepunch_manager
989 .handle_probe_failed(link_id, session_id);
990 self.dispatch_holepunch_actions(hp_actions);
991 }
992 Event::LoadHook {
993 name,
994 wasm_bytes,
995 attach_point,
996 priority,
997 response_tx,
998 } => {
999 #[cfg(feature = "hooks")]
1000 {
1001 let result = (|| -> Result<(), String> {
1002 let point_idx = crate::config::parse_hook_point(&attach_point)
1003 .ok_or_else(|| format!("unknown hook point '{}'", attach_point))?;
1004 let mgr = self
1005 .hook_manager
1006 .as_ref()
1007 .ok_or_else(|| "hook manager not available".to_string())?;
1008 let program = mgr
1009 .compile(name.clone(), &wasm_bytes, priority)
1010 .map_err(|e| format!("compile error: {}", e))?;
1011 self.hook_slots[point_idx].attach(program);
1012 log::info!(
1013 "Loaded hook '{}' at point {} (priority {})",
1014 name,
1015 attach_point,
1016 priority
1017 );
1018 Ok(())
1019 })();
1020 let _ = response_tx.send(result);
1021 }
1022 #[cfg(not(feature = "hooks"))]
1023 {
1024 let _ = (name, wasm_bytes, attach_point, priority);
1025 let _ = response_tx.send(Err("hooks not enabled".to_string()));
1026 }
1027 }
1028 Event::LoadHookFile {
1029 name,
1030 path,
1031 hook_type,
1032 attach_point,
1033 priority,
1034 response_tx,
1035 } => {
1036 #[cfg(feature = "hooks")]
1037 {
1038 let result = (|| -> Result<(), String> {
1039 let point_idx = crate::config::parse_hook_point(&attach_point)
1040 .ok_or_else(|| format!("unknown hook point '{}'", attach_point))?;
1041 let backend = crate::config::parse_hook_backend(&hook_type)?;
1042 let mgr = self
1043 .hook_manager
1044 .as_ref()
1045 .ok_or_else(|| "hook manager not available".to_string())?;
1046 let program = mgr
1047 .load_file_backend(
1048 name.clone(),
1049 std::path::Path::new(&path),
1050 priority,
1051 backend,
1052 )
1053 .map_err(|e| format!("load error: {}", e))?;
1054 self.hook_slots[point_idx].attach(program);
1055 log::info!(
1056 "Loaded {} hook '{}' at point {} (priority {})",
1057 backend.as_str(),
1058 name,
1059 attach_point,
1060 priority
1061 );
1062 Ok(())
1063 })();
1064 let _ = response_tx.send(result);
1065 }
1066 #[cfg(not(feature = "hooks"))]
1067 {
1068 let _ = (name, path, hook_type, attach_point, priority);
1069 let _ = response_tx.send(Err("hooks not enabled".to_string()));
1070 }
1071 }
1072 Event::LoadBuiltinHook {
1073 name,
1074 builtin_id,
1075 attach_point,
1076 priority,
1077 response_tx,
1078 } => {
1079 #[cfg(feature = "hooks")]
1080 {
1081 let result = (|| -> Result<(), String> {
1082 let point_idx = crate::config::parse_hook_point(&attach_point)
1083 .ok_or_else(|| format!("unknown hook point '{}'", attach_point))?;
1084 let mgr = self
1085 .hook_manager
1086 .as_ref()
1087 .ok_or_else(|| "hook manager not available".to_string())?;
1088 let program = mgr
1089 .load_builtin(name.clone(), builtin_id.as_str(), priority)
1090 .map_err(|e| format!("load error: {}", e))?;
1091 self.hook_slots[point_idx].attach(program);
1092 log::info!(
1093 "Loaded built-in hook '{}' ({}) at point {} (priority {})",
1094 name,
1095 builtin_id,
1096 attach_point,
1097 priority
1098 );
1099 Ok(())
1100 })();
1101 let _ = response_tx.send(result);
1102 }
1103 #[cfg(not(feature = "hooks"))]
1104 {
1105 let _ = (name, builtin_id, attach_point, priority);
1106 let _ = response_tx.send(Err("hooks not enabled".to_string()));
1107 }
1108 }
1109 Event::UnloadHook {
1110 name,
1111 attach_point,
1112 response_tx,
1113 } => {
1114 #[cfg(feature = "hooks")]
1115 {
1116 let result = (|| -> Result<(), String> {
1117 let point_idx = crate::config::parse_hook_point(&attach_point)
1118 .ok_or_else(|| format!("unknown hook point '{}'", attach_point))?;
1119 match self.hook_slots[point_idx].detach(&name) {
1120 Some(_) => {
1121 log::info!(
1122 "Unloaded hook '{}' from point {}",
1123 name,
1124 attach_point
1125 );
1126 Ok(())
1127 }
1128 None => Err(format!(
1129 "hook '{}' not found at point '{}'",
1130 name, attach_point
1131 )),
1132 }
1133 })();
1134 let _ = response_tx.send(result);
1135 }
1136 #[cfg(not(feature = "hooks"))]
1137 {
1138 let _ = (name, attach_point);
1139 let _ = response_tx.send(Err("hooks not enabled".to_string()));
1140 }
1141 }
1142 Event::ReloadHook {
1143 name,
1144 attach_point,
1145 wasm_bytes,
1146 response_tx,
1147 } => {
1148 #[cfg(feature = "hooks")]
1149 {
1150 let result = (|| -> Result<(), String> {
1151 let point_idx = crate::config::parse_hook_point(&attach_point)
1152 .ok_or_else(|| format!("unknown hook point '{}'", attach_point))?;
1153 let old =
1154 self.hook_slots[point_idx].detach(&name).ok_or_else(|| {
1155 format!("hook '{}' not found at point '{}'", name, attach_point)
1156 })?;
1157 let priority = old.priority;
1158 let mgr = match self.hook_manager.as_ref() {
1159 Some(m) => m,
1160 None => {
1161 self.hook_slots[point_idx].attach(old);
1162 return Err("hook manager not available".to_string());
1163 }
1164 };
1165 match mgr.compile(name.clone(), &wasm_bytes, priority) {
1166 Ok(program) => {
1167 self.hook_slots[point_idx].attach(program);
1168 log::info!(
1169 "Reloaded hook '{}' at point {} (priority {})",
1170 name,
1171 attach_point,
1172 priority
1173 );
1174 Ok(())
1175 }
1176 Err(e) => {
1177 self.hook_slots[point_idx].attach(old);
1178 Err(format!("compile error: {}", e))
1179 }
1180 }
1181 })();
1182 let _ = response_tx.send(result);
1183 }
1184 #[cfg(not(feature = "hooks"))]
1185 {
1186 let _ = (name, attach_point, wasm_bytes);
1187 let _ = response_tx.send(Err("hooks not enabled".to_string()));
1188 }
1189 }
1190 Event::ReloadHookFile {
1191 name,
1192 attach_point,
1193 path,
1194 hook_type,
1195 response_tx,
1196 } => {
1197 #[cfg(feature = "hooks")]
1198 {
1199 let result = (|| -> Result<(), String> {
1200 let point_idx = crate::config::parse_hook_point(&attach_point)
1201 .ok_or_else(|| format!("unknown hook point '{}'", attach_point))?;
1202 let old =
1203 self.hook_slots[point_idx].detach(&name).ok_or_else(|| {
1204 format!("hook '{}' not found at point '{}'", name, attach_point)
1205 })?;
1206 let priority = old.priority;
1207 let backend = match crate::config::parse_hook_backend(&hook_type) {
1208 Ok(backend) => backend,
1209 Err(e) => {
1210 self.hook_slots[point_idx].attach(old);
1211 return Err(e);
1212 }
1213 };
1214 let mgr = match self.hook_manager.as_ref() {
1215 Some(m) => m,
1216 None => {
1217 self.hook_slots[point_idx].attach(old);
1218 return Err("hook manager not available".to_string());
1219 }
1220 };
1221 match mgr.load_file_backend(
1222 name.clone(),
1223 std::path::Path::new(&path),
1224 priority,
1225 backend,
1226 ) {
1227 Ok(program) => {
1228 self.hook_slots[point_idx].attach(program);
1229 log::info!(
1230 "Reloaded {} hook '{}' at point {} (priority {})",
1231 backend.as_str(),
1232 name,
1233 attach_point,
1234 priority
1235 );
1236 Ok(())
1237 }
1238 Err(e) => {
1239 self.hook_slots[point_idx].attach(old);
1240 Err(format!("load error: {}", e))
1241 }
1242 }
1243 })();
1244 let _ = response_tx.send(result);
1245 }
1246 #[cfg(not(feature = "hooks"))]
1247 {
1248 let _ = (name, attach_point, path, hook_type);
1249 let _ = response_tx.send(Err("hooks not enabled".to_string()));
1250 }
1251 }
1252 Event::ReloadBuiltinHook {
1253 name,
1254 attach_point,
1255 builtin_id,
1256 response_tx,
1257 } => {
1258 #[cfg(feature = "hooks")]
1259 {
1260 let result = (|| -> Result<(), String> {
1261 let point_idx = crate::config::parse_hook_point(&attach_point)
1262 .ok_or_else(|| format!("unknown hook point '{}'", attach_point))?;
1263 let old =
1264 self.hook_slots[point_idx].detach(&name).ok_or_else(|| {
1265 format!("hook '{}' not found at point '{}'", name, attach_point)
1266 })?;
1267 let priority = old.priority;
1268 let mgr = match self.hook_manager.as_ref() {
1269 Some(m) => m,
1270 None => {
1271 self.hook_slots[point_idx].attach(old);
1272 return Err("hook manager not available".to_string());
1273 }
1274 };
1275 match mgr.load_builtin(name.clone(), builtin_id.as_str(), priority) {
1276 Ok(program) => {
1277 self.hook_slots[point_idx].attach(program);
1278 log::info!(
1279 "Reloaded built-in hook '{}' ({}) at point {} (priority {})",
1280 name,
1281 builtin_id,
1282 attach_point,
1283 priority
1284 );
1285 Ok(())
1286 }
1287 Err(e) => {
1288 self.hook_slots[point_idx].attach(old);
1289 Err(format!("load error: {}", e))
1290 }
1291 }
1292 })();
1293 let _ = response_tx.send(result);
1294 }
1295 #[cfg(not(feature = "hooks"))]
1296 {
1297 let _ = (name, attach_point, builtin_id);
1298 let _ = response_tx.send(Err("hooks not enabled".to_string()));
1299 }
1300 }
1301 Event::SetHookEnabled {
1302 name,
1303 attach_point,
1304 enabled,
1305 response_tx,
1306 } => {
1307 #[cfg(feature = "hooks")]
1308 {
1309 let result = self.update_hook_program(&name, &attach_point, |program| {
1310 program.enabled = enabled;
1311 });
1312 if result.is_ok() {
1313 log::info!(
1314 "{} hook '{}' at point {}",
1315 if enabled { "Enabled" } else { "Disabled" },
1316 name,
1317 attach_point,
1318 );
1319 }
1320 let _ = response_tx.send(result);
1321 }
1322 #[cfg(not(feature = "hooks"))]
1323 {
1324 let _ = (name, attach_point, enabled);
1325 let _ = response_tx.send(Err("hooks not enabled".to_string()));
1326 }
1327 }
1328 Event::SetHookPriority {
1329 name,
1330 attach_point,
1331 priority,
1332 response_tx,
1333 } => {
1334 #[cfg(feature = "hooks")]
1335 {
1336 let result = self.update_hook_program(&name, &attach_point, |program| {
1337 program.priority = priority;
1338 });
1339 if result.is_ok() {
1340 if let Some(point_idx) = crate::config::parse_hook_point(&attach_point)
1341 {
1342 self.hook_slots[point_idx]
1343 .programs
1344 .sort_by(|a, b| b.priority.cmp(&a.priority));
1345 log::info!(
1346 "Updated hook '{}' at point {} to priority {}",
1347 name,
1348 attach_point,
1349 priority,
1350 );
1351 } else {
1352 log::error!(
1353 "hook point '{}' became invalid during priority update",
1354 attach_point
1355 );
1356 }
1357 }
1358 let _ = response_tx.send(result);
1359 }
1360 #[cfg(not(feature = "hooks"))]
1361 {
1362 let _ = (name, attach_point, priority);
1363 let _ = response_tx.send(Err("hooks not enabled".to_string()));
1364 }
1365 }
1366 Event::ListHooks { response_tx } => {
1367 #[cfg(feature = "hooks")]
1368 {
1369 let hook_point_names = [
1370 "PreIngress",
1371 "PreDispatch",
1372 "AnnounceReceived",
1373 "PathUpdated",
1374 "AnnounceRetransmit",
1375 "LinkRequestReceived",
1376 "LinkEstablished",
1377 "LinkClosed",
1378 "InterfaceUp",
1379 "InterfaceDown",
1380 "InterfaceConfigChanged",
1381 "BackbonePeerConnected",
1382 "BackbonePeerDisconnected",
1383 "BackbonePeerIdleTimeout",
1384 "BackbonePeerWriteStall",
1385 "BackbonePeerPenalty",
1386 "SendOnInterface",
1387 "BroadcastOnAllInterfaces",
1388 "DeliverLocal",
1389 "TunnelSynthesize",
1390 "Tick",
1391 ];
1392 let mut infos = Vec::new();
1393 for (idx, slot) in self.hook_slots.iter().enumerate() {
1394 let point_name = hook_point_names.get(idx).unwrap_or(&"Unknown");
1395 for prog in &slot.programs {
1396 infos.push(crate::event::HookInfo {
1397 name: prog.name.clone(),
1398 hook_type: prog.backend_name().to_string(),
1399 attach_point: point_name.to_string(),
1400 priority: prog.priority,
1401 enabled: prog.enabled,
1402 consecutive_traps: prog.consecutive_traps,
1403 });
1404 }
1405 }
1406 let _ = response_tx.send(infos);
1407 }
1408 #[cfg(not(feature = "hooks"))]
1409 {
1410 let _ = response_tx.send(Vec::new());
1411 }
1412 }
1413 Event::InterfaceConfigChanged(id) => {
1414 #[cfg(feature = "hooks")]
1415 {
1416 let ctx = HookContext::Interface { interface_id: id.0 };
1417 let now = time::now();
1418 let engine_ref = EngineRef {
1419 engine: &self.engine,
1420 interfaces: &self.interfaces,
1421 link_manager: &self.link_manager,
1422 now,
1423 };
1424 let provider_events_enabled = self.provider_events_enabled();
1425 if let Some(ref e) = run_hook_inner(
1426 &mut self.hook_slots[HookPoint::InterfaceConfigChanged as usize]
1427 .programs,
1428 &self.hook_manager,
1429 &engine_ref,
1430 &ctx,
1431 now,
1432 provider_events_enabled,
1433 ) {
1434 self.forward_hook_side_effects("InterfaceConfigChanged", e);
1435 }
1436 }
1437 #[cfg(not(feature = "hooks"))]
1438 let _ = id;
1439 }
1440 Event::BackbonePeerConnected {
1441 server_interface_id,
1442 peer_interface_id,
1443 peer_ip,
1444 peer_port,
1445 } => {
1446 #[cfg(feature = "hooks")]
1447 {
1448 self.run_backbone_peer_hook(
1449 "BackbonePeerConnected",
1450 HookPoint::BackbonePeerConnected,
1451 &BackbonePeerHookEvent {
1452 server_interface_id,
1453 peer_interface_id: Some(peer_interface_id),
1454 peer_ip,
1455 peer_port,
1456 connected_for: Duration::ZERO,
1457 had_received_data: false,
1458 penalty_level: 0,
1459 blacklist_for: Duration::ZERO,
1460 },
1461 );
1462 }
1463 #[cfg(not(feature = "hooks"))]
1464 let _ = (server_interface_id, peer_interface_id, peer_ip, peer_port);
1465 }
1466 Event::BackbonePeerDisconnected {
1467 server_interface_id,
1468 peer_interface_id,
1469 peer_ip,
1470 peer_port,
1471 connected_for,
1472 had_received_data,
1473 } => {
1474 #[cfg(feature = "hooks")]
1475 {
1476 self.run_backbone_peer_hook(
1477 "BackbonePeerDisconnected",
1478 HookPoint::BackbonePeerDisconnected,
1479 &BackbonePeerHookEvent {
1480 server_interface_id,
1481 peer_interface_id: Some(peer_interface_id),
1482 peer_ip,
1483 peer_port,
1484 connected_for,
1485 had_received_data,
1486 penalty_level: 0,
1487 blacklist_for: Duration::ZERO,
1488 },
1489 );
1490 }
1491 #[cfg(not(feature = "hooks"))]
1492 let _ = (
1493 server_interface_id,
1494 peer_interface_id,
1495 peer_ip,
1496 peer_port,
1497 connected_for,
1498 had_received_data,
1499 );
1500 }
1501 Event::BackbonePeerIdleTimeout {
1502 server_interface_id,
1503 peer_interface_id,
1504 peer_ip,
1505 peer_port,
1506 connected_for,
1507 } => {
1508 #[cfg(feature = "hooks")]
1509 {
1510 self.run_backbone_peer_hook(
1511 "BackbonePeerIdleTimeout",
1512 HookPoint::BackbonePeerIdleTimeout,
1513 &BackbonePeerHookEvent {
1514 server_interface_id,
1515 peer_interface_id: Some(peer_interface_id),
1516 peer_ip,
1517 peer_port,
1518 connected_for,
1519 had_received_data: false,
1520 penalty_level: 0,
1521 blacklist_for: Duration::ZERO,
1522 },
1523 );
1524 }
1525 #[cfg(not(feature = "hooks"))]
1526 let _ = (
1527 server_interface_id,
1528 peer_interface_id,
1529 peer_ip,
1530 peer_port,
1531 connected_for,
1532 );
1533 }
1534 Event::BackbonePeerWriteStall {
1535 server_interface_id,
1536 peer_interface_id,
1537 peer_ip,
1538 peer_port,
1539 connected_for,
1540 } => {
1541 #[cfg(feature = "hooks")]
1542 {
1543 self.run_backbone_peer_hook(
1544 "BackbonePeerWriteStall",
1545 HookPoint::BackbonePeerWriteStall,
1546 &BackbonePeerHookEvent {
1547 server_interface_id,
1548 peer_interface_id: Some(peer_interface_id),
1549 peer_ip,
1550 peer_port,
1551 connected_for,
1552 had_received_data: false,
1553 penalty_level: 0,
1554 blacklist_for: Duration::ZERO,
1555 },
1556 );
1557 }
1558 #[cfg(not(feature = "hooks"))]
1559 let _ = (
1560 server_interface_id,
1561 peer_interface_id,
1562 peer_ip,
1563 peer_port,
1564 connected_for,
1565 );
1566 }
1567 Event::BackbonePeerPenalty {
1568 server_interface_id,
1569 peer_ip,
1570 penalty_level,
1571 blacklist_for,
1572 } => {
1573 #[cfg(feature = "hooks")]
1574 {
1575 self.run_backbone_peer_hook(
1576 "BackbonePeerPenalty",
1577 HookPoint::BackbonePeerPenalty,
1578 &BackbonePeerHookEvent {
1579 server_interface_id,
1580 peer_interface_id: None,
1581 peer_ip,
1582 peer_port: 0,
1583 connected_for: Duration::ZERO,
1584 had_received_data: false,
1585 penalty_level,
1586 blacklist_for,
1587 },
1588 );
1589 }
1590 #[cfg(not(feature = "hooks"))]
1591 let _ = (server_interface_id, peer_ip, penalty_level, blacklist_for);
1592 }
1593 Event::Shutdown => {
1594 self.graceful_shutdown();
1595 break;
1596 }
1597 }
1598 }
1599 }
1600 pub(crate) fn handle_tunnel_synth_delivery(&mut self, raw: &[u8]) {
1601 let packet = match RawPacket::unpack(raw) {
1603 Ok(p) => p,
1604 Err(_) => return,
1605 };
1606
1607 match rns_core::transport::tunnel::validate_tunnel_synthesize_data(&packet.data) {
1608 Ok(validated) => {
1609 let iface_id = self
1612 .interfaces
1613 .iter()
1614 .find(|(_, entry)| entry.info.wants_tunnel && entry.online && entry.enabled)
1615 .map(|(id, _)| *id);
1616
1617 if let Some(iface) = iface_id {
1618 let now = time::now();
1619 let tunnel_actions = self.engine.handle_tunnel(validated.tunnel_id, iface, now);
1620 self.dispatch_all(tunnel_actions);
1621 }
1622 }
1623 Err(e) => {
1624 log::debug!("Tunnel synthesis validation failed: {}", e);
1625 }
1626 }
1627 }
1628
1629 pub(crate) fn synthesize_tunnel_for_interface(&mut self, interface: InterfaceId) {
1633 if let Some(ref identity) = self.transport_identity {
1634 let actions = self
1635 .engine
1636 .synthesize_tunnel(identity, interface, &mut self.rng);
1637 self.dispatch_all(actions);
1638 }
1639 }
1640
1641 pub(crate) fn handle_request_path(&mut self, dest_hash: [u8; 16]) {
1643 let mut data = Vec::with_capacity(48);
1645 data.extend_from_slice(&dest_hash);
1646
1647 if self.engine.transport_enabled() {
1648 if let Some(id_hash) = self.engine.identity_hash() {
1649 data.extend_from_slice(id_hash);
1650 }
1651 }
1652
1653 let mut tag = [0u8; 16];
1655 self.rng.fill_bytes(&mut tag);
1656 data.extend_from_slice(&tag);
1657
1658 let flags = rns_core::packet::PacketFlags {
1660 header_type: rns_core::constants::HEADER_1,
1661 context_flag: rns_core::constants::FLAG_UNSET,
1662 transport_type: rns_core::constants::TRANSPORT_BROADCAST,
1663 destination_type: rns_core::constants::DESTINATION_PLAIN,
1664 packet_type: rns_core::constants::PACKET_TYPE_DATA,
1665 };
1666
1667 if let Ok(packet) = RawPacket::pack(
1668 flags,
1669 0,
1670 &self.path_request_dest,
1671 None,
1672 rns_core::constants::CONTEXT_NONE,
1673 &data,
1674 ) {
1675 let actions = self.engine.handle_outbound(
1676 &packet,
1677 rns_core::constants::DESTINATION_PLAIN,
1678 None,
1679 time::now(),
1680 );
1681 self.dispatch_all(actions);
1682 }
1683 }
1684
1685 pub(crate) fn get_event_sender(&self) -> crate::event::EventSender {
1688 self.event_tx.clone()
1692 }
1693
1694 const MANAGEMENT_ANNOUNCE_DELAY: f64 = 5.0;
1696
1697 pub(crate) fn tick_discovery_announcer(&mut self, now: f64) {
1699 let announcer = match self.interface_announcer.as_mut() {
1700 Some(a) => a,
1701 None => return,
1702 };
1703
1704 announcer.maybe_start(now);
1705
1706 let stamp_result = match announcer.poll_ready() {
1707 Some(r) => r,
1708 None => return,
1709 };
1710
1711 if !announcer.contains_interface(&stamp_result.interface_name) {
1712 log::debug!(
1713 "Discovery: dropping completed stamp for removed interface '{}'",
1714 stamp_result.interface_name
1715 );
1716 return;
1717 }
1718
1719 let identity = match self.transport_identity.as_ref() {
1720 Some(id) => id,
1721 None => {
1722 log::warn!("Discovery: stamp ready but no transport identity");
1723 return;
1724 }
1725 };
1726
1727 let identity_hash = identity.hash();
1729 let disc_dest = rns_core::destination::destination_hash(
1730 crate::discovery::APP_NAME,
1731 &["discovery", "interface"],
1732 Some(&identity_hash),
1733 );
1734 let name_hash = self.discovery_name_hash;
1735 let mut random_hash = [0u8; 10];
1736 self.rng.fill_bytes(&mut random_hash);
1737
1738 let (announce_data, _) = match rns_core::announce::AnnounceData::pack(
1739 identity,
1740 &disc_dest,
1741 &name_hash,
1742 &random_hash,
1743 None,
1744 Some(&stamp_result.app_data),
1745 ) {
1746 Ok(v) => v,
1747 Err(e) => {
1748 log::warn!("Discovery: failed to pack announce: {}", e);
1749 return;
1750 }
1751 };
1752
1753 let flags = rns_core::packet::PacketFlags {
1754 header_type: rns_core::constants::HEADER_1,
1755 context_flag: rns_core::constants::FLAG_UNSET,
1756 transport_type: rns_core::constants::TRANSPORT_BROADCAST,
1757 destination_type: rns_core::constants::DESTINATION_SINGLE,
1758 packet_type: rns_core::constants::PACKET_TYPE_ANNOUNCE,
1759 };
1760
1761 let packet = match RawPacket::pack(
1762 flags,
1763 0,
1764 &disc_dest,
1765 None,
1766 rns_core::constants::CONTEXT_NONE,
1767 &announce_data,
1768 ) {
1769 Ok(p) => p,
1770 Err(e) => {
1771 log::warn!("Discovery: failed to pack packet: {}", e);
1772 return;
1773 }
1774 };
1775
1776 let outbound_actions = self.engine.handle_outbound(
1777 &packet,
1778 rns_core::constants::DESTINATION_SINGLE,
1779 None,
1780 now,
1781 );
1782 log::debug!(
1783 "Discovery announce sent for interface '{}' ({} actions, dest={:02x?})",
1784 stamp_result.interface_name,
1785 outbound_actions.len(),
1786 &disc_dest[..4],
1787 );
1788 self.dispatch_all(outbound_actions);
1789 }
1790
1791 pub(crate) fn rss_mb() -> Option<f64> {
1793 let statm = std::fs::read_to_string("/proc/self/statm").ok()?;
1794 let rss_pages: u64 = statm.split_whitespace().nth(1)?.parse().ok()?;
1795 Some(rss_pages as f64 * 4096.0 / (1024.0 * 1024.0))
1796 }
1797
1798 pub(crate) fn parse_proc_kib(contents: &str, key: &str) -> Option<u64> {
1799 contents.lines().find_map(|line| {
1800 let value = line.strip_prefix(key)?;
1801 value.split_whitespace().next()?.parse().ok()
1802 })
1803 }
1804
1805 pub(crate) fn proc_status_mb() -> Option<(f64, f64, f64, f64)> {
1806 let status = std::fs::read_to_string("/proc/self/status").ok()?;
1807 let vm_rss = Self::parse_proc_kib(&status, "VmRSS:")? as f64 / 1024.0;
1808 let vm_hwm = Self::parse_proc_kib(&status, "VmHWM:")? as f64 / 1024.0;
1809 let vm_data = Self::parse_proc_kib(&status, "VmData:")? as f64 / 1024.0;
1810 let vm_swap = Self::parse_proc_kib(&status, "VmSwap:").unwrap_or(0) as f64 / 1024.0;
1811 Some((vm_rss, vm_hwm, vm_data, vm_swap))
1812 }
1813
1814 pub(crate) fn smaps_rollup_mb() -> Option<(f64, f64, f64, f64, f64, f64, f64, f64)> {
1815 let smaps = std::fs::read_to_string("/proc/self/smaps_rollup").ok()?;
1816 let rss_kib = Self::parse_proc_kib(&smaps, "Rss:")?;
1817 let anon_kib = Self::parse_proc_kib(&smaps, "Anonymous:")?;
1818 let shared_clean_kib = Self::parse_proc_kib(&smaps, "Shared_Clean:").unwrap_or(0);
1819 let shared_dirty_kib = Self::parse_proc_kib(&smaps, "Shared_Dirty:").unwrap_or(0);
1820 let private_clean_kib = Self::parse_proc_kib(&smaps, "Private_Clean:").unwrap_or(0);
1821 let private_dirty_kib = Self::parse_proc_kib(&smaps, "Private_Dirty:").unwrap_or(0);
1822 let swap_kib = Self::parse_proc_kib(&smaps, "Swap:").unwrap_or(0);
1823 let file_est_kib = rss_kib.saturating_sub(anon_kib);
1824 Some((
1825 rss_kib as f64 / 1024.0,
1826 anon_kib as f64 / 1024.0,
1827 file_est_kib as f64 / 1024.0,
1828 shared_clean_kib as f64 / 1024.0,
1829 shared_dirty_kib as f64 / 1024.0,
1830 private_clean_kib as f64 / 1024.0,
1831 private_dirty_kib as f64 / 1024.0,
1832 swap_kib as f64 / 1024.0,
1833 ))
1834 }
1835
1836 pub(crate) fn log_memory_stats(&self) {
1838 let rss = Self::rss_mb()
1839 .map(|v| format!("{:.1}", v))
1840 .unwrap_or_else(|| "N/A".into());
1841 let (vm_rss, vm_hwm, vm_data, vm_swap) = Self::proc_status_mb()
1842 .map(|(rss, hwm, data, swap)| {
1843 (
1844 format!("{rss:.1}"),
1845 format!("{hwm:.1}"),
1846 format!("{data:.1}"),
1847 format!("{swap:.1}"),
1848 )
1849 })
1850 .unwrap_or_else(|| ("N/A".into(), "N/A".into(), "N/A".into(), "N/A".into()));
1851 let (
1852 smaps_rss,
1853 smaps_anon,
1854 smaps_file_est,
1855 smaps_shared_clean,
1856 smaps_shared_dirty,
1857 smaps_private_clean,
1858 smaps_private_dirty,
1859 smaps_swap,
1860 ) = Self::smaps_rollup_mb()
1861 .map(
1862 |(
1863 rss,
1864 anon,
1865 file_est,
1866 shared_clean,
1867 shared_dirty,
1868 private_clean,
1869 private_dirty,
1870 swap,
1871 )| {
1872 (
1873 format!("{rss:.1}"),
1874 format!("{anon:.1}"),
1875 format!("{file_est:.1}"),
1876 format!("{shared_clean:.1}"),
1877 format!("{shared_dirty:.1}"),
1878 format!("{private_clean:.1}"),
1879 format!("{private_dirty:.1}"),
1880 format!("{swap:.1}"),
1881 )
1882 },
1883 )
1884 .unwrap_or_else(|| {
1885 (
1886 "N/A".into(),
1887 "N/A".into(),
1888 "N/A".into(),
1889 "N/A".into(),
1890 "N/A".into(),
1891 "N/A".into(),
1892 "N/A".into(),
1893 "N/A".into(),
1894 )
1895 });
1896 log::info!(
1897 "MEMSTATS rss_mb={} vmrss_mb={} vmhwm_mb={} vmdata_mb={} vmswap_mb={} smaps_rss_mb={} smaps_anon_mb={} smaps_file_est_mb={} smaps_shared_clean_mb={} smaps_shared_dirty_mb={} smaps_private_clean_mb={} smaps_private_dirty_mb={} smaps_swap_mb={} known_dest={} known_dest_cap_evict={} path={} path_cap_evict={} announce={} reverse={} link={} held_ann={} hashlist={} sig_cache={} ann_verify_q={} rate_lim={} blackhole={} tunnel={} ann_q_ifaces={} ann_q_nonempty={} ann_q_entries={} ann_q_bytes={} ann_q_iface_drop={} pr_tags={} disc_pr={} sent_pkt={} completed={} local_dest={} shared_ann={} lm_links={} hp_sessions={} proof_strat={}",
1898 rss,
1899 vm_rss,
1900 vm_hwm,
1901 vm_data,
1902 vm_swap,
1903 smaps_rss,
1904 smaps_anon,
1905 smaps_file_est,
1906 smaps_shared_clean,
1907 smaps_shared_dirty,
1908 smaps_private_clean,
1909 smaps_private_dirty,
1910 smaps_swap,
1911 self.known_destinations.len(),
1912 self.known_destinations_cap_evict_count,
1913 self.engine.path_table_count(),
1914 self.engine.path_destination_cap_evict_count(),
1915 self.engine.announce_table_count(),
1916 self.engine.reverse_table_count(),
1917 self.engine.link_table_count(),
1918 self.engine.held_announces_count(),
1919 self.engine.packet_hashlist_len(),
1920 self.engine.announce_sig_cache_len(),
1921 self.announce_verify_queue
1922 .lock()
1923 .map(|queue| queue.len())
1924 .unwrap_or(0),
1925 self.engine.rate_limiter_count(),
1926 self.engine.blackholed_count(),
1927 self.engine.tunnel_count(),
1928 self.engine.announce_queue_count(),
1929 self.engine.nonempty_announce_queue_count(),
1930 self.engine.queued_announce_count(),
1931 self.engine.queued_announce_bytes(),
1932 self.engine.announce_queue_interface_cap_drop_count(),
1933 self.engine.discovery_pr_tags_count(),
1934 self.engine.discovery_path_requests_count(),
1935 self.sent_packets.len(),
1936 self.completed_proofs.len(),
1937 self.local_destinations.len(),
1938 self.shared_announces.len(),
1939 self.link_manager.link_count(),
1940 self.holepunch_manager.session_count(),
1941 self.proof_strategies.len(),
1942 );
1943 }
1944
1945 pub(crate) fn tick_management_announces(&mut self, now: f64) {
1947 if self.transport_identity.is_none() {
1948 return;
1949 }
1950
1951 let uptime = now - self.started;
1952
1953 if !self.initial_announce_sent {
1955 if uptime < Self::MANAGEMENT_ANNOUNCE_DELAY {
1956 return;
1957 }
1958 self.initial_announce_sent = true;
1959 self.emit_management_announces(now);
1960 return;
1961 }
1962
1963 if now - self.last_management_announce >= self.management_announce_interval_secs {
1965 self.emit_management_announces(now);
1966 }
1967 }
1968
1969 pub(crate) fn emit_management_announces(&mut self, now: f64) {
1971 use crate::management;
1972
1973 self.last_management_announce = now;
1974
1975 let identity = match self.transport_identity {
1976 Some(ref id) => id,
1977 None => return,
1978 };
1979
1980 let mgmt_raw = if self.management_config.enable_remote_management {
1982 management::build_management_announce(identity, &mut self.rng)
1983 } else {
1984 None
1985 };
1986
1987 let bh_raw = if self.management_config.publish_blackhole {
1988 management::build_blackhole_announce(identity, &mut self.rng)
1989 } else {
1990 None
1991 };
1992
1993 let probe_raw = if self.probe_responder_hash.is_some() {
1994 management::build_probe_announce(identity, &mut self.rng)
1995 } else {
1996 None
1997 };
1998
1999 if let Some(raw) = mgmt_raw {
2000 if let Ok(packet) = RawPacket::unpack(&raw) {
2001 let actions = self.engine.handle_outbound(
2002 &packet,
2003 rns_core::constants::DESTINATION_SINGLE,
2004 None,
2005 now,
2006 );
2007 self.dispatch_all(actions);
2008 log::debug!("Emitted management destination announce");
2009 }
2010 }
2011
2012 if let Some(raw) = bh_raw {
2013 if let Ok(packet) = RawPacket::unpack(&raw) {
2014 let actions = self.engine.handle_outbound(
2015 &packet,
2016 rns_core::constants::DESTINATION_SINGLE,
2017 None,
2018 now,
2019 );
2020 self.dispatch_all(actions);
2021 log::debug!("Emitted blackhole info announce");
2022 }
2023 }
2024
2025 if let Some(raw) = probe_raw {
2026 if let Ok(packet) = RawPacket::unpack(&raw) {
2027 let actions = self.engine.handle_outbound(
2028 &packet,
2029 rns_core::constants::DESTINATION_SINGLE,
2030 None,
2031 now,
2032 );
2033 self.dispatch_all(actions);
2034 log::debug!("Emitted probe responder announce");
2035 }
2036 }
2037 }
2038
2039 pub(crate) fn handle_management_request(
2041 &mut self,
2042 link_id: [u8; 16],
2043 path_hash: [u8; 16],
2044 data: Vec<u8>,
2045 request_id: [u8; 16],
2046 remote_identity: Option<([u8; 16], [u8; 64])>,
2047 ) {
2048 use crate::management;
2049
2050 let is_restricted = path_hash == management::status_path_hash()
2052 || path_hash == management::path_path_hash();
2053
2054 if is_restricted && !self.management_config.remote_management_allowed.is_empty() {
2055 match remote_identity {
2056 Some((identity_hash, _)) => {
2057 if !self
2058 .management_config
2059 .remote_management_allowed
2060 .contains(&identity_hash)
2061 {
2062 log::debug!("Management request denied: identity not in allowed list");
2063 return;
2064 }
2065 }
2066 None => {
2067 log::debug!("Management request denied: peer not identified");
2068 return;
2069 }
2070 }
2071 }
2072
2073 let response_data = if path_hash == management::status_path_hash() {
2074 {
2075 let views: Vec<&dyn management::InterfaceStatusView> = self
2076 .interfaces
2077 .values()
2078 .map(|e| e as &dyn management::InterfaceStatusView)
2079 .collect();
2080 management::handle_status_request(
2081 &data,
2082 &self.engine,
2083 &views,
2084 self.started,
2085 self.probe_responder_hash,
2086 )
2087 }
2088 } else if path_hash == management::path_path_hash() {
2089 management::handle_path_request(&data, &self.engine)
2090 } else if path_hash == management::list_path_hash() {
2091 management::handle_blackhole_list_request(&self.engine)
2092 } else {
2093 log::warn!("Unknown management path_hash: {:02x?}", &path_hash[..4]);
2094 None
2095 };
2096
2097 if let Some(response) = response_data {
2098 let actions = self.link_manager.send_management_response(
2099 &link_id,
2100 &request_id,
2101 &response,
2102 &mut self.rng,
2103 );
2104 self.dispatch_link_actions(actions);
2105 }
2106 }
2107}