1use super::*;
2use rns_core::transport::{InboundFrame, RxMetadata};
3
4impl Driver {
5 pub(crate) fn handle_frame_event(
6 &mut self,
7 interface_id: InterfaceId,
8 data: Vec<u8>,
9 rssi: Option<i16>,
10 snr: Option<f32>,
11 ) {
12 if data.len() > 2 && (data[0] & 0x03) == 0x01 {
13 log::debug!(
14 "Announce:frame from iface {} (len={}, flags=0x{:02x})",
15 interface_id.0,
16 data.len(),
17 data[0]
18 );
19 }
20 if let Some(entry) = self.interfaces.get(&interface_id) {
21 if !entry.enabled || !entry.online {
22 return;
23 }
24 }
25 if let Some(entry) = self.interfaces.get_mut(&interface_id) {
26 entry.stats.rxb += data.len() as u64;
27 entry.stats.rx_packets += 1;
28 }
29
30 let packet = if let Some(entry) = self.interfaces.get(&interface_id) {
31 if let Some(ref ifac_state) = entry.ifac {
32 match ifac::unmask_inbound(&data, ifac_state) {
33 Some(unmasked) => unmasked,
34 None => {
35 log::debug!("[{}] IFAC rejected packet", interface_id.0);
36 return;
37 }
38 }
39 } else {
40 if data.len() > 2 && data[0] & 0x80 == 0x80 {
41 log::debug!(
42 "[{}] dropping packet with IFAC flag on non-IFAC interface",
43 interface_id.0
44 );
45 return;
46 }
47 data
48 }
49 } else {
50 data
51 };
52
53 #[cfg(feature = "hooks")]
54 {
55 let pkt_ctx = rns_hooks::PacketContext {
56 flags: if packet.is_empty() { 0 } else { packet[0] },
57 hops: if packet.len() > 1 { packet[1] } else { 0 },
58 destination_hash: extract_dest_hash(&packet),
59 context: 0,
60 packet_hash: [0; 32],
61 interface_id: interface_id.0,
62 data_offset: 0,
63 data_len: packet.len() as u32,
64 };
65 let ctx = HookContext::Packet {
66 ctx: &pkt_ctx,
67 raw: &packet,
68 };
69 let now = time::now();
70 let engine_ref = EngineRef {
71 engine: &self.engine,
72 interfaces: &self.interfaces,
73 link_manager: &self.link_manager,
74 now,
75 };
76 let provider_events_enabled = self.provider_events_enabled();
77 if let Some(ref e) = run_hook_inner(
78 &mut self.hook_slots[HookPoint::PreIngress as usize].programs,
79 &self.hook_manager,
80 &engine_ref,
81 &ctx,
82 now,
83 provider_events_enabled,
84 ) {
85 self.forward_hook_side_effects("PreIngress", e);
86 if e.hook_result.as_ref().is_some_and(|r| r.is_drop()) {
87 return;
88 }
89 }
90 }
91
92 if packet.len() > 2 && (packet[0] & 0x03) == 0x01 {
93 let now = time::now();
94 if let Some(entry) = self.interfaces.get_mut(&interface_id) {
95 entry.stats.record_incoming_announce(now);
96 }
97 }
98
99 if let Some(entry) = self.interfaces.get(&interface_id) {
100 self.engine.update_interface_freqs(
101 interface_id,
102 entry.stats.incoming_announce_freq(),
103 entry.stats.incoming_path_request_freq(),
104 entry.stats.outgoing_path_request_freq(),
105 entry.stats.outgoing_path_request_samples(),
106 );
107 }
108
109 let inbound_frame = InboundFrame {
110 raw: &packet,
111 iface: interface_id,
112 now: time::now(),
113 rx: RxMetadata { rssi, snr },
114 };
115
116 let actions = if self.async_announce_verification {
117 let mut announce_queue = self
118 .announce_verify_queue
119 .lock()
120 .unwrap_or_else(|poisoned| poisoned.into_inner());
121 self.engine.handle_inbound_with_announce_queue(
122 inbound_frame,
123 &mut self.rng,
124 Some(&mut announce_queue),
125 )
126 } else {
127 self.engine.handle_inbound(inbound_frame, &mut self.rng)
128 };
129
130 #[cfg(feature = "hooks")]
131 {
132 let pkt_ctx = rns_hooks::PacketContext {
133 flags: if packet.is_empty() { 0 } else { packet[0] },
134 hops: if packet.len() > 1 { packet[1] } else { 0 },
135 destination_hash: extract_dest_hash(&packet),
136 context: 0,
137 packet_hash: [0; 32],
138 interface_id: interface_id.0,
139 data_offset: 0,
140 data_len: packet.len() as u32,
141 };
142 let ctx = HookContext::Packet {
143 ctx: &pkt_ctx,
144 raw: &packet,
145 };
146 let now = time::now();
147 let engine_ref = EngineRef {
148 engine: &self.engine,
149 interfaces: &self.interfaces,
150 link_manager: &self.link_manager,
151 now,
152 };
153 let provider_events_enabled = self.provider_events_enabled();
154 if let Some(ref e) = run_hook_inner(
155 &mut self.hook_slots[HookPoint::PreDispatch as usize].programs,
156 &self.hook_manager,
157 &engine_ref,
158 &ctx,
159 now,
160 provider_events_enabled,
161 ) {
162 self.forward_hook_side_effects("PreDispatch", e);
163 }
164 }
165
166 self.dispatch_all(actions);
167 }
168
169 pub(crate) fn handle_announce_verified_event(
170 &mut self,
171 key: rns_core::transport::announce_verify_queue::AnnounceVerifyKey,
172 validated: rns_core::announce::ValidatedAnnounce,
173 sig_cache_key: [u8; 32],
174 ) {
175 let pending = {
176 let mut announce_queue = self
177 .announce_verify_queue
178 .lock()
179 .unwrap_or_else(|poisoned| poisoned.into_inner());
180 announce_queue.complete_success(&key)
181 };
182 if let Some(pending) = pending {
183 let actions = self.engine.complete_verified_announce(
184 pending,
185 validated,
186 sig_cache_key,
187 time::now(),
188 &mut self.rng,
189 );
190 self.dispatch_all(actions);
191 }
192 }
193
194 pub(crate) fn handle_tick_event(&mut self) {
195 #[cfg(feature = "hooks")]
196 {
197 let ctx = HookContext::Tick;
198 let now = time::now();
199 let engine_ref = EngineRef {
200 engine: &self.engine,
201 interfaces: &self.interfaces,
202 link_manager: &self.link_manager,
203 now,
204 };
205 let provider_events_enabled = self.provider_events_enabled();
206 if let Some(ref e) = run_hook_inner(
207 &mut self.hook_slots[HookPoint::Tick as usize].programs,
208 &self.hook_manager,
209 &engine_ref,
210 &ctx,
211 now,
212 provider_events_enabled,
213 ) {
214 self.forward_hook_side_effects("Tick", e);
215 }
216 }
217
218 let now = time::now();
219 for (id, entry) in &self.interfaces {
220 self.engine.update_interface_freqs(
221 *id,
222 entry.stats.incoming_announce_freq(),
223 entry.stats.incoming_path_request_freq(),
224 entry.stats.outgoing_path_request_freq(),
225 entry.stats.outgoing_path_request_samples(),
226 );
227 }
228 let actions = self.engine.tick(now, &mut self.rng);
229 self.dispatch_all(actions);
230 let link_actions = self.link_manager.tick(&mut self.rng);
231 self.dispatch_link_actions(link_actions);
232 self.enforce_drain_deadline();
233 {
234 let tx = self.get_event_sender();
235 let hp_actions = self.holepunch_manager.tick(&tx);
236 self.dispatch_holepunch_actions(hp_actions);
237 }
238 self.tick_management_announces(now);
239 self.sent_packets
240 .retain(|_, (_, sent_time)| now - *sent_time < 60.0);
241 self.completed_proofs
242 .retain(|_, (_, received)| now - *received < 120.0);
243
244 self.tick_discovery_announcer(now);
245 #[cfg(feature = "iface-backbone")]
246 self.maintain_backbone_peer_pool();
247
248 self.memory_stats_counter += 1;
249 if self.memory_stats_counter >= 300 {
250 self.memory_stats_counter = 0;
251 self.log_memory_stats();
252 }
253
254 if self.discover_interfaces {
255 self.discovery_cleanup_counter += 1;
256 if self.discovery_cleanup_counter >= self.discovery_cleanup_interval_ticks {
257 self.discovery_cleanup_counter = 0;
258 if let Ok(removed) = self.discovered_interfaces.cleanup() {
259 if removed > 0 {
260 log::info!("Discovery cleanup: removed {} stale entries", removed);
261 }
262 #[cfg(feature = "iface-backbone")]
263 self.cull_stale_discovered_backbone_peer_pool_candidates();
264 }
265 }
266 }
267
268 self.cache_cleanup_counter += 1;
269 if self.cache_cleanup_counter >= self.known_destinations_cleanup_interval_ticks {
270 self.cache_cleanup_counter = 0;
271
272 let active_dests = self.engine.active_destination_hashes();
273 let ttl = self.known_destinations_ttl;
274 let kd_before = self.known_destinations.len();
275 self.known_destinations.retain(|k, state| {
276 active_dests.contains(k)
277 || self.local_destinations.contains_key(k)
278 || state.retained
279 || now - Self::known_destination_relevance_time(state) < ttl
280 });
281 let kd_removed = kd_before - self.known_destinations.len();
282 let kd_evicted = self.enforce_known_destination_cap(false);
283 let rl_removed =
284 self.engine
285 .cull_rate_limiter(&active_dests, now, self.rate_limiter_ttl_secs);
286
287 if kd_removed > 0 || kd_evicted > 0 || rl_removed > 0 {
288 log::info!(
289 "Memory cleanup: removed {} known_destinations, evicted {} known_destinations, {} rate_limiter entries",
290 kd_removed, kd_evicted, rl_removed
291 );
292 }
293 }
294
295 self.announce_cache_cleanup_counter += 1;
296 if self.announce_cache_cleanup_counter >= self.announce_cache_cleanup_interval_ticks {
297 self.announce_cache_cleanup_counter = 0;
298 if self.announce_cache.is_some() && self.cache_cleanup_active_hashes.is_none() {
299 self.cache_cleanup_active_hashes = Some(self.engine.active_packet_hashes());
300 self.cache_cleanup_entries = None;
301 self.cache_cleanup_removed = 0;
302 }
303 }
304
305 if self.cache_cleanup_active_hashes.is_some() {
306 if let Some(ref cache) = self.announce_cache {
307 if self.cache_cleanup_entries.is_none() {
308 match cache.entries() {
309 Ok(entries) => self.cache_cleanup_entries = Some(entries),
310 Err(e) => {
311 log::warn!("Announce cache cleanup failed to open directory: {}", e);
312 self.cache_cleanup_active_hashes = None;
313 self.cache_cleanup_entries = None;
314 }
315 }
316 }
317 }
318
319 if let Some(ref cache) = self.announce_cache {
320 let Some(active_hashes) = self.cache_cleanup_active_hashes.as_ref() else {
321 self.cache_cleanup_entries = None;
322 return;
323 };
324 let entries = match self.cache_cleanup_entries.as_mut() {
325 Some(entries) => entries,
326 None => return,
327 };
328 match cache.clean_batch(
329 active_hashes,
330 entries,
331 self.announce_cache_cleanup_batch_size,
332 ) {
333 Ok((removed, finished)) => {
334 self.cache_cleanup_removed += removed;
335 if finished {
336 if self.cache_cleanup_removed > 0 {
337 log::info!(
338 "Announce cache cleanup complete: removed {} stale files",
339 self.cache_cleanup_removed
340 );
341 }
342 self.cache_cleanup_active_hashes = None;
343 self.cache_cleanup_entries = None;
344 }
345 }
346 Err(e) => {
347 log::warn!("Announce cache cleanup failed: {}", e);
348 self.cache_cleanup_active_hashes = None;
349 self.cache_cleanup_entries = None;
350 }
351 }
352 } else {
353 self.cache_cleanup_active_hashes = None;
354 self.cache_cleanup_entries = None;
355 }
356 }
357 }
358
359 pub(crate) fn handle_interface_up_event(
360 &mut self,
361 id: InterfaceId,
362 new_writer: Option<Box<dyn crate::interface::Writer>>,
363 info: Option<rns_core::transport::types::InterfaceInfo>,
364 ) {
365 let wants_tunnel;
366 let mut replay_shared_announces = false;
367 if let Some(mut info) = info {
368 log::info!("[{}] dynamic interface registered", id.0);
369 self.apply_announce_rate_defaults(&mut info);
370 self.apply_ingress_control_defaults(&mut info);
371 wants_tunnel = info.wants_tunnel;
372 let iface_type = infer_interface_type(&info.name);
373 info.started = time::now();
374 self.register_interface_runtime_defaults(&info);
375 self.engine.register_interface(info.clone());
376 if let Some(writer) = new_writer {
377 let (writer, async_writer_metrics) =
378 self.wrap_interface_writer(id, &info.name, writer);
379 self.interfaces.insert(
380 id,
381 InterfaceEntry {
382 id,
383 info,
384 writer,
385 async_writer_metrics: Some(async_writer_metrics),
386 enabled: true,
387 online: true,
388 dynamic: true,
389 ifac: None,
390 stats: InterfaceStats {
391 started: time::now(),
392 ..Default::default()
393 },
394 interface_type: iface_type,
395 send_retry_at: None,
396 send_retry_backoff: Duration::ZERO,
397 },
398 );
399 }
400 self.callbacks.on_interface_up(id);
401 #[cfg(feature = "hooks")]
402 {
403 let ctx = HookContext::Interface { interface_id: id.0 };
404 let now = time::now();
405 let engine_ref = EngineRef {
406 engine: &self.engine,
407 interfaces: &self.interfaces,
408 link_manager: &self.link_manager,
409 now,
410 };
411 let provider_events_enabled = self.provider_events_enabled();
412 if let Some(ref e) = run_hook_inner(
413 &mut self.hook_slots[HookPoint::InterfaceUp as usize].programs,
414 &self.hook_manager,
415 &engine_ref,
416 &ctx,
417 now,
418 provider_events_enabled,
419 ) {
420 self.forward_hook_side_effects("InterfaceUp", e);
421 }
422 }
423 } else {
424 let is_local_client = self
425 .interfaces
426 .get(&id)
427 .map(|entry| entry.info.is_local_client)
428 .unwrap_or(false);
429 replay_shared_announces =
430 is_local_client && self.shared_reconnect_pending.remove(&id).unwrap_or(false);
431 let interface_name = self
432 .interfaces
433 .get(&id)
434 .map(|entry| entry.info.name.clone())
435 .unwrap_or_else(|| format!("iface-{}", id.0));
436 let wrapped_writer =
437 new_writer.map(|writer| self.wrap_interface_writer(id, &interface_name, writer));
438 if let Some(entry) = self.interfaces.get_mut(&id) {
439 log::info!("[{}] interface online", id.0);
440 wants_tunnel = entry.info.wants_tunnel;
441 entry.online = true;
442 if let Some((writer, async_writer_metrics)) = wrapped_writer {
443 log::info!("[{}] writer refreshed after reconnect", id.0);
444 entry.writer = writer;
445 entry.async_writer_metrics = Some(async_writer_metrics);
446 }
447 self.callbacks.on_interface_up(id);
448 #[cfg(feature = "hooks")]
449 {
450 let ctx = HookContext::Interface { interface_id: id.0 };
451 let now = time::now();
452 let engine_ref = EngineRef {
453 engine: &self.engine,
454 interfaces: &self.interfaces,
455 link_manager: &self.link_manager,
456 now,
457 };
458 let provider_events_enabled = self.provider_events_enabled();
459 if let Some(ref e) = run_hook_inner(
460 &mut self.hook_slots[HookPoint::InterfaceUp as usize].programs,
461 &self.hook_manager,
462 &engine_ref,
463 &ctx,
464 now,
465 provider_events_enabled,
466 ) {
467 self.forward_hook_side_effects("InterfaceUp", e);
468 }
469 }
470 } else {
471 wants_tunnel = false;
472 }
473 }
474
475 if wants_tunnel {
476 self.synthesize_tunnel_for_interface(id);
477 }
478 if replay_shared_announces {
479 self.replay_shared_announces();
480 }
481 }
482
483 pub(crate) fn handle_interface_down_event(&mut self, id: InterfaceId) {
484 if let Some(entry) = self.interfaces.get(&id) {
485 if let Some(tunnel_id) = entry.info.tunnel_id {
486 self.engine.void_tunnel_interface(&tunnel_id);
487 }
488 }
489
490 if let Some(entry) = self.interfaces.get(&id) {
491 let is_dynamic = entry.dynamic;
492 let is_local_client = entry.info.is_local_client;
493 let interface_name = entry.info.name.clone();
494 if is_dynamic {
495 log::info!("[{}] dynamic interface removed", id.0);
496 self.interface_runtime_defaults.remove(&interface_name);
497 self.engine.deregister_interface(id);
498 self.interfaces.remove(&id);
499 } else {
500 log::info!("[{}] interface offline", id.0);
501 if let Some(entry) = self.interfaces.get_mut(&id) {
502 entry.online = false;
503 } else {
504 log::warn!(
505 "interface {} disappeared while handling interface-down",
506 id.0
507 );
508 return;
509 }
510 if is_local_client {
511 self.handle_shared_interface_down(id);
512 }
513 }
514 self.callbacks.on_interface_down(id);
515 #[cfg(feature = "hooks")]
516 {
517 let ctx = HookContext::Interface { interface_id: id.0 };
518 let now = time::now();
519 let engine_ref = EngineRef {
520 engine: &self.engine,
521 interfaces: &self.interfaces,
522 link_manager: &self.link_manager,
523 now,
524 };
525 let provider_events_enabled = self.provider_events_enabled();
526 if let Some(ref e) = run_hook_inner(
527 &mut self.hook_slots[HookPoint::InterfaceDown as usize].programs,
528 &self.hook_manager,
529 &engine_ref,
530 &ctx,
531 now,
532 provider_events_enabled,
533 ) {
534 self.forward_hook_side_effects("InterfaceDown", e);
535 }
536 }
537 }
538 #[cfg(feature = "iface-backbone")]
539 self.handle_backbone_peer_pool_down(id);
540 }
541
542 pub(crate) fn known_destination_route_hint(
543 &self,
544 dest_hash: &[u8; 16],
545 ) -> Option<(InterfaceId, u8)> {
546 let announced = &self.known_destinations.get(dest_hash)?.announced;
547 let iface = announced.receiving_interface;
548 if iface.0 == 0 {
549 return None;
550 }
551
552 self.interfaces
553 .get(&iface)
554 .filter(|entry| entry.online)
555 .map(|_| (iface, announced.hops))
556 }
557
558 pub(crate) fn handle_send_outbound_event(
559 &mut self,
560 raw: Vec<u8>,
561 dest_type: u8,
562 attached_interface: Option<InterfaceId>,
563 ) {
564 if self.is_draining() {
565 self.reject_new_work("send outbound packet");
566 return;
567 }
568 match RawPacket::unpack(&raw) {
569 Ok(packet) => {
570 let is_announce =
571 packet.flags.packet_type == rns_core::constants::PACKET_TYPE_ANNOUNCE;
572 if is_announce {
573 log::debug!(
574 "SendOutbound: ANNOUNCE for {:02x?} (len={}, dest_type={}, attached={:?})",
575 &packet.destination_hash[..4],
576 raw.len(),
577 dest_type,
578 attached_interface
579 );
580 }
581 if packet.flags.packet_type == rns_core::constants::PACKET_TYPE_DATA {
582 log::debug!(
583 "SendOutbound: DATA dest={:02x}{:02x}{:02x}{:02x}.. header={} transport={:02x?} attached={:?}",
584 packet.destination_hash[0],
585 packet.destination_hash[1],
586 packet.destination_hash[2],
587 packet.destination_hash[3],
588 packet.flags.header_type,
589 packet.transport_id.as_ref().map(|id| &id[..4]),
590 attached_interface
591 );
592 self.sent_packets
593 .insert(packet.packet_hash, (packet.destination_hash, time::now()));
594 }
595 let actions = self.engine.handle_outbound(
596 &packet,
597 dest_type,
598 attached_interface,
599 time::now(),
600 );
601 if packet.flags.packet_type == rns_core::constants::PACKET_TYPE_DATA {
602 log::debug!(
603 "SendOutbound: DATA routed to {} actions: {:?}",
604 actions.len(),
605 actions
606 .iter()
607 .map(|a| match a {
608 TransportAction::SendOnInterface { interface, raw } =>
609 format!("SendOn({};{}b)", interface.0, raw.len()),
610 TransportAction::BroadcastOnAllInterfaces { raw, .. } =>
611 format!("BroadcastAll({}b)", raw.len()),
612 _ => "other".to_string(),
613 })
614 .collect::<Vec<_>>()
615 );
616 }
617 if is_announce {
618 log::debug!(
619 "SendOutbound: announce routed to {} actions: {:?}",
620 actions.len(),
621 actions
622 .iter()
623 .map(|a| match a {
624 TransportAction::SendOnInterface { interface, .. } =>
625 format!("SendOn({})", interface.0),
626 TransportAction::BroadcastOnAllInterfaces { .. } =>
627 "BroadcastAll".to_string(),
628 _ => "other".to_string(),
629 })
630 .collect::<Vec<_>>()
631 );
632 }
633 self.dispatch_all(actions);
634 }
635 Err(e) => {
636 log::warn!("SendOutbound: failed to unpack packet: {:?}", e);
637 }
638 }
639 }
640
641 pub fn run(&mut self) {
643 loop {
644 let event = match self.rx.recv() {
645 Ok(e) => e,
646 Err(_) => break, };
648
649 match event {
650 Event::Frame {
651 interface_id,
652 data,
653 rssi,
654 snr,
655 } => {
656 self.handle_frame_event(interface_id, data, rssi, snr);
657 }
658 Event::AnnounceVerified {
659 key,
660 validated,
661 sig_cache_key,
662 } => {
663 self.handle_announce_verified_event(key, validated, sig_cache_key);
664 }
665 Event::AnnounceVerifyFailed { key, .. } => {
666 let mut announce_queue = self
667 .announce_verify_queue
668 .lock()
669 .unwrap_or_else(|poisoned| poisoned.into_inner());
670 let _ = announce_queue.complete_failure(&key);
671 }
672 Event::Tick => self.handle_tick_event(),
673 Event::BeginDrain { timeout } => {
674 self.begin_drain(timeout);
675 }
676 Event::InterfaceUp(id, new_writer, info) => {
677 self.handle_interface_up_event(id, new_writer, info);
678 }
679 Event::InterfaceDown(id) => self.handle_interface_down_event(id),
680 Event::SendOutbound {
681 raw,
682 dest_type,
683 attached_interface,
684 } => self.handle_send_outbound_event(raw, dest_type, attached_interface),
685 Event::RegisterDestination {
686 dest_hash,
687 dest_type,
688 } => {
689 self.engine.register_destination(dest_hash, dest_type);
690 self.local_destinations.insert(dest_hash, dest_type);
691 }
692 Event::StoreSharedAnnounce {
693 dest_hash,
694 name_hash,
695 identity_prv_key,
696 app_data,
697 } => {
698 self.shared_announces.insert(
699 dest_hash,
700 SharedAnnounceRecord {
701 name_hash,
702 identity_prv_key,
703 app_data,
704 },
705 );
706 }
707 Event::DeregisterDestination { dest_hash } => {
708 self.engine.deregister_destination(&dest_hash);
709 self.local_destinations.remove(&dest_hash);
710 self.shared_announces.remove(&dest_hash);
711 }
712 Event::Query(request, response_tx) => {
713 let response = self.handle_query_mut(request);
714 let _ = response_tx.send(response);
715 }
716 Event::DeregisterLinkDestination { dest_hash } => {
717 self.link_manager.deregister_link_destination(&dest_hash);
718 }
719 Event::RegisterLinkDestination {
720 dest_hash,
721 sig_prv_bytes,
722 sig_pub_bytes,
723 resource_strategy,
724 } => {
725 let sig_prv =
726 rns_crypto::ed25519::Ed25519PrivateKey::from_bytes(&sig_prv_bytes);
727 let strat = match resource_strategy {
728 1 => crate::link_manager::ResourceStrategy::AcceptAll,
729 2 => crate::link_manager::ResourceStrategy::AcceptApp,
730 _ => crate::link_manager::ResourceStrategy::AcceptNone,
731 };
732 self.link_manager.register_link_destination(
733 dest_hash,
734 sig_prv,
735 sig_pub_bytes,
736 strat,
737 );
738 self.engine
740 .register_destination(dest_hash, rns_core::constants::DESTINATION_SINGLE);
741 self.local_destinations
742 .insert(dest_hash, rns_core::constants::DESTINATION_SINGLE);
743 }
744 Event::RegisterRequestHandler {
745 path,
746 allowed_list,
747 handler,
748 } => {
749 self.link_manager.register_request_handler(
750 &path,
751 allowed_list,
752 move |link_id, p, data, remote| handler(link_id, p, data, remote),
753 );
754 }
755 Event::RegisterRequestHandlerResponse {
756 path,
757 allowed_list,
758 handler,
759 } => {
760 self.link_manager.register_request_handler_response(
761 &path,
762 allowed_list,
763 move |link_id, p, data, remote| handler(link_id, p, data, remote),
764 );
765 }
766 Event::CreateLink {
767 dest_hash,
768 dest_sig_pub_bytes,
769 response_tx,
770 } => {
771 if self.is_draining() {
772 self.reject_new_work("create link");
773 let _ = (dest_hash, dest_sig_pub_bytes);
774 let _ = response_tx.send([0u8; 16]);
775 continue;
776 }
777 let next_hop_interface = self.engine.next_hop_interface(&dest_hash);
778 let recalled_route_hint = if next_hop_interface.is_none() {
779 self.known_destination_route_hint(&dest_hash)
780 } else {
781 None
782 };
783 if recalled_route_hint.is_some() {
784 let _ = self.mark_known_destination_used(&dest_hash);
785 }
786 let attached_interface =
787 next_hop_interface.or(recalled_route_hint.map(|(iface, _)| iface));
788 let hops = self
789 .engine
790 .hops_to(&dest_hash)
791 .or_else(|| recalled_route_hint.map(|(_, hops)| hops))
792 .unwrap_or(0);
793 let mtu = attached_interface
794 .and_then(|iface_id| self.interfaces.get(&iface_id))
795 .map(|entry| entry.info.mtu)
796 .unwrap_or(rns_core::constants::MTU as u32);
797 let (link_id, mut link_actions) = self.link_manager.create_link(
798 &dest_hash,
799 &dest_sig_pub_bytes,
800 hops,
801 mtu,
802 &mut self.rng,
803 );
804 if let Some(iface) = attached_interface {
805 self.link_manager.set_link_route_hint(&link_id, iface, None);
806 }
807 if next_hop_interface.is_none() {
808 if let Some(iface) = attached_interface {
809 for action in &mut link_actions {
810 if let LinkManagerAction::SendPacket {
811 dest_type,
812 attached_interface,
813 ..
814 } = action
815 {
816 if *dest_type == rns_core::constants::DESTINATION_LINK
817 && attached_interface.is_none()
818 {
819 *attached_interface = Some(iface);
820 }
821 }
822 }
823 }
824 }
825 let _ = response_tx.send(link_id);
826 self.dispatch_link_actions(link_actions);
827 }
828 Event::SendRequest {
829 link_id,
830 path,
831 data,
832 } => {
833 if self.is_draining() {
834 self.reject_new_work("send link request");
835 let _ = (link_id, path, data);
836 continue;
837 }
838 let link_actions =
839 self.link_manager
840 .send_request(&link_id, &path, &data, &mut self.rng);
841 self.dispatch_link_actions(link_actions);
842 }
843 Event::IdentifyOnLink {
844 link_id,
845 identity_prv_key,
846 } => {
847 if self.is_draining() {
848 self.reject_new_work("identify on link");
849 let _ = (link_id, identity_prv_key);
850 continue;
851 }
852 let identity =
853 rns_crypto::identity::Identity::from_private_key(&identity_prv_key);
854 let link_actions =
855 self.link_manager
856 .identify(&link_id, &identity, &mut self.rng);
857 self.dispatch_link_actions(link_actions);
858 }
859 Event::TeardownLink { link_id } => {
860 let link_actions = self.link_manager.teardown_link(&link_id);
861 self.dispatch_link_actions(link_actions);
862 }
863 Event::SendResource {
864 link_id,
865 data,
866 metadata,
867 auto_compress,
868 } => {
869 if self.is_draining() {
870 self.reject_new_work("send resource");
871 let _ = (link_id, data, metadata, auto_compress);
872 continue;
873 }
874 let link_actions = self.link_manager.send_resource_with_auto_compress(
875 &link_id,
876 &data,
877 metadata.as_deref(),
878 auto_compress,
879 &mut self.rng,
880 );
881 self.dispatch_link_actions(link_actions);
882 }
883 Event::SetResourceStrategy { link_id, strategy } => {
884 use crate::link_manager::ResourceStrategy;
885 let strat = match strategy {
886 0 => ResourceStrategy::AcceptNone,
887 1 => ResourceStrategy::AcceptAll,
888 2 => ResourceStrategy::AcceptApp,
889 _ => ResourceStrategy::AcceptNone,
890 };
891 self.link_manager.set_resource_strategy(&link_id, strat);
892 }
893 Event::AcceptResource {
894 link_id,
895 resource_hash,
896 accept,
897 } => {
898 if self.is_draining() && accept {
899 self.reject_new_work("accept resource");
900 let _ = (link_id, resource_hash, accept);
901 continue;
902 }
903 let link_actions = self.link_manager.accept_resource(
904 &link_id,
905 &resource_hash,
906 accept,
907 &mut self.rng,
908 );
909 self.dispatch_link_actions(link_actions);
910 }
911 Event::SendChannelMessage {
912 link_id,
913 msgtype,
914 payload,
915 response_tx,
916 } => {
917 if self.is_draining() {
918 self.reject_new_work("send channel message");
919 let _ = response_tx.send(Err(self.drain_error("send channel message")));
920 continue;
921 }
922 match self.link_manager.send_channel_message(
923 &link_id,
924 msgtype,
925 &payload,
926 &mut self.rng,
927 ) {
928 Ok(link_actions) => {
929 self.dispatch_link_actions(link_actions);
930 let _ = response_tx.send(Ok(()));
931 }
932 Err(err) => {
933 let _ = response_tx.send(Err(err));
934 }
935 }
936 }
937 Event::SendOnLink {
938 link_id,
939 data,
940 context,
941 } => {
942 if self.is_draining() {
943 self.reject_new_work("send link payload");
944 let _ = (link_id, data, context);
945 continue;
946 }
947 let link_actions =
948 self.link_manager
949 .send_on_link(&link_id, &data, context, &mut self.rng);
950 self.dispatch_link_actions(link_actions);
951 }
952 Event::RequestPath { dest_hash } => {
953 if self.is_draining() {
954 self.reject_new_work("request path");
955 let _ = dest_hash;
956 continue;
957 }
958 self.handle_request_path(dest_hash);
959 }
960 Event::RegisterProofStrategy {
961 dest_hash,
962 strategy,
963 signing_key,
964 } => {
965 let identity = signing_key
966 .map(|key| rns_crypto::identity::Identity::from_private_key(&key));
967 self.proof_strategies
968 .insert(dest_hash, (strategy, identity));
969 }
970 Event::ProposeDirectConnect { link_id } => {
971 if self.is_draining() {
972 self.reject_new_work("propose direct connect");
973 let _ = link_id;
974 continue;
975 }
976 let derived_key = self.link_manager.get_derived_key(&link_id);
977 if let Some(dk) = derived_key {
978 let tx = self.get_event_sender();
979 let hp_actions =
980 self.holepunch_manager
981 .propose(link_id, &dk, &mut self.rng, &tx);
982 self.dispatch_holepunch_actions(hp_actions);
983 } else {
984 log::warn!(
985 "Cannot propose direct connect: no derived key for link {:02x?}",
986 &link_id[..4]
987 );
988 }
989 }
990 Event::SetDirectConnectPolicy { policy } => {
991 self.holepunch_manager.set_policy(policy);
992 }
993 Event::HolePunchProbeResult {
994 link_id,
995 session_id,
996 observed_addr,
997 socket,
998 probe_server,
999 } => {
1000 let hp_actions = self.holepunch_manager.handle_probe_result(
1001 link_id,
1002 session_id,
1003 observed_addr,
1004 socket,
1005 probe_server,
1006 );
1007 self.dispatch_holepunch_actions(hp_actions);
1008 }
1009 Event::HolePunchProbeFailed {
1010 link_id,
1011 session_id,
1012 } => {
1013 let hp_actions = self
1014 .holepunch_manager
1015 .handle_probe_failed(link_id, session_id);
1016 self.dispatch_holepunch_actions(hp_actions);
1017 }
1018 Event::LoadHook {
1019 name,
1020 wasm_bytes,
1021 attach_point,
1022 priority,
1023 response_tx,
1024 } => {
1025 #[cfg(feature = "hooks")]
1026 {
1027 let result = (|| -> Result<(), String> {
1028 let point_idx = crate::config::parse_hook_point(&attach_point)
1029 .ok_or_else(|| format!("unknown hook point '{}'", attach_point))?;
1030 let mgr = self
1031 .hook_manager
1032 .as_ref()
1033 .ok_or_else(|| "hook manager not available".to_string())?;
1034 let program = mgr
1035 .compile(name.clone(), &wasm_bytes, priority)
1036 .map_err(|e| format!("compile error: {}", e))?;
1037 self.hook_slots[point_idx].attach(program);
1038 log::info!(
1039 "Loaded hook '{}' at point {} (priority {})",
1040 name,
1041 attach_point,
1042 priority
1043 );
1044 Ok(())
1045 })();
1046 let _ = response_tx.send(result);
1047 }
1048 #[cfg(not(feature = "hooks"))]
1049 {
1050 let _ = (name, wasm_bytes, attach_point, priority);
1051 let _ = response_tx.send(Err("hooks not enabled".to_string()));
1052 }
1053 }
1054 Event::LoadHookFile {
1055 name,
1056 path,
1057 hook_type,
1058 attach_point,
1059 priority,
1060 response_tx,
1061 } => {
1062 #[cfg(feature = "hooks")]
1063 {
1064 let result = (|| -> Result<(), String> {
1065 let point_idx = crate::config::parse_hook_point(&attach_point)
1066 .ok_or_else(|| format!("unknown hook point '{}'", attach_point))?;
1067 let backend = crate::config::parse_hook_backend(&hook_type)?;
1068 let mgr = self
1069 .hook_manager
1070 .as_ref()
1071 .ok_or_else(|| "hook manager not available".to_string())?;
1072 let program = mgr
1073 .load_file_backend(
1074 name.clone(),
1075 std::path::Path::new(&path),
1076 priority,
1077 backend,
1078 )
1079 .map_err(|e| format!("load error: {}", e))?;
1080 self.hook_slots[point_idx].attach(program);
1081 log::info!(
1082 "Loaded {} hook '{}' at point {} (priority {})",
1083 backend.as_str(),
1084 name,
1085 attach_point,
1086 priority
1087 );
1088 Ok(())
1089 })();
1090 let _ = response_tx.send(result);
1091 }
1092 #[cfg(not(feature = "hooks"))]
1093 {
1094 let _ = (name, path, hook_type, attach_point, priority);
1095 let _ = response_tx.send(Err("hooks not enabled".to_string()));
1096 }
1097 }
1098 Event::LoadBuiltinHook {
1099 name,
1100 builtin_id,
1101 attach_point,
1102 priority,
1103 response_tx,
1104 } => {
1105 #[cfg(feature = "hooks")]
1106 {
1107 let result = (|| -> Result<(), String> {
1108 let point_idx = crate::config::parse_hook_point(&attach_point)
1109 .ok_or_else(|| format!("unknown hook point '{}'", attach_point))?;
1110 let mgr = self
1111 .hook_manager
1112 .as_ref()
1113 .ok_or_else(|| "hook manager not available".to_string())?;
1114 let program = mgr
1115 .load_builtin(name.clone(), builtin_id.as_str(), priority)
1116 .map_err(|e| format!("load error: {}", e))?;
1117 self.hook_slots[point_idx].attach(program);
1118 log::info!(
1119 "Loaded built-in hook '{}' ({}) at point {} (priority {})",
1120 name,
1121 builtin_id,
1122 attach_point,
1123 priority
1124 );
1125 Ok(())
1126 })();
1127 let _ = response_tx.send(result);
1128 }
1129 #[cfg(not(feature = "hooks"))]
1130 {
1131 let _ = (name, builtin_id, attach_point, priority);
1132 let _ = response_tx.send(Err("hooks not enabled".to_string()));
1133 }
1134 }
1135 Event::UnloadHook {
1136 name,
1137 attach_point,
1138 response_tx,
1139 } => {
1140 #[cfg(feature = "hooks")]
1141 {
1142 let result = (|| -> Result<(), String> {
1143 let point_idx = crate::config::parse_hook_point(&attach_point)
1144 .ok_or_else(|| format!("unknown hook point '{}'", attach_point))?;
1145 match self.hook_slots[point_idx].detach(&name) {
1146 Some(_) => {
1147 log::info!(
1148 "Unloaded hook '{}' from point {}",
1149 name,
1150 attach_point
1151 );
1152 Ok(())
1153 }
1154 None => Err(format!(
1155 "hook '{}' not found at point '{}'",
1156 name, attach_point
1157 )),
1158 }
1159 })();
1160 let _ = response_tx.send(result);
1161 }
1162 #[cfg(not(feature = "hooks"))]
1163 {
1164 let _ = (name, attach_point);
1165 let _ = response_tx.send(Err("hooks not enabled".to_string()));
1166 }
1167 }
1168 Event::ReloadHook {
1169 name,
1170 attach_point,
1171 wasm_bytes,
1172 response_tx,
1173 } => {
1174 #[cfg(feature = "hooks")]
1175 {
1176 let result = (|| -> Result<(), String> {
1177 let point_idx = crate::config::parse_hook_point(&attach_point)
1178 .ok_or_else(|| format!("unknown hook point '{}'", attach_point))?;
1179 let old =
1180 self.hook_slots[point_idx].detach(&name).ok_or_else(|| {
1181 format!("hook '{}' not found at point '{}'", name, attach_point)
1182 })?;
1183 let priority = old.priority;
1184 let mgr = match self.hook_manager.as_ref() {
1185 Some(m) => m,
1186 None => {
1187 self.hook_slots[point_idx].attach(old);
1188 return Err("hook manager not available".to_string());
1189 }
1190 };
1191 match mgr.compile(name.clone(), &wasm_bytes, priority) {
1192 Ok(program) => {
1193 self.hook_slots[point_idx].attach(program);
1194 log::info!(
1195 "Reloaded hook '{}' at point {} (priority {})",
1196 name,
1197 attach_point,
1198 priority
1199 );
1200 Ok(())
1201 }
1202 Err(e) => {
1203 self.hook_slots[point_idx].attach(old);
1204 Err(format!("compile error: {}", e))
1205 }
1206 }
1207 })();
1208 let _ = response_tx.send(result);
1209 }
1210 #[cfg(not(feature = "hooks"))]
1211 {
1212 let _ = (name, attach_point, wasm_bytes);
1213 let _ = response_tx.send(Err("hooks not enabled".to_string()));
1214 }
1215 }
1216 Event::ReloadHookFile {
1217 name,
1218 attach_point,
1219 path,
1220 hook_type,
1221 response_tx,
1222 } => {
1223 #[cfg(feature = "hooks")]
1224 {
1225 let result = (|| -> Result<(), String> {
1226 let point_idx = crate::config::parse_hook_point(&attach_point)
1227 .ok_or_else(|| format!("unknown hook point '{}'", attach_point))?;
1228 let old =
1229 self.hook_slots[point_idx].detach(&name).ok_or_else(|| {
1230 format!("hook '{}' not found at point '{}'", name, attach_point)
1231 })?;
1232 let priority = old.priority;
1233 let backend = match crate::config::parse_hook_backend(&hook_type) {
1234 Ok(backend) => backend,
1235 Err(e) => {
1236 self.hook_slots[point_idx].attach(old);
1237 return Err(e);
1238 }
1239 };
1240 let mgr = match self.hook_manager.as_ref() {
1241 Some(m) => m,
1242 None => {
1243 self.hook_slots[point_idx].attach(old);
1244 return Err("hook manager not available".to_string());
1245 }
1246 };
1247 match mgr.load_file_backend(
1248 name.clone(),
1249 std::path::Path::new(&path),
1250 priority,
1251 backend,
1252 ) {
1253 Ok(program) => {
1254 self.hook_slots[point_idx].attach(program);
1255 log::info!(
1256 "Reloaded {} hook '{}' at point {} (priority {})",
1257 backend.as_str(),
1258 name,
1259 attach_point,
1260 priority
1261 );
1262 Ok(())
1263 }
1264 Err(e) => {
1265 self.hook_slots[point_idx].attach(old);
1266 Err(format!("load error: {}", e))
1267 }
1268 }
1269 })();
1270 let _ = response_tx.send(result);
1271 }
1272 #[cfg(not(feature = "hooks"))]
1273 {
1274 let _ = (name, attach_point, path, hook_type);
1275 let _ = response_tx.send(Err("hooks not enabled".to_string()));
1276 }
1277 }
1278 Event::ReloadBuiltinHook {
1279 name,
1280 attach_point,
1281 builtin_id,
1282 response_tx,
1283 } => {
1284 #[cfg(feature = "hooks")]
1285 {
1286 let result = (|| -> Result<(), String> {
1287 let point_idx = crate::config::parse_hook_point(&attach_point)
1288 .ok_or_else(|| format!("unknown hook point '{}'", attach_point))?;
1289 let old =
1290 self.hook_slots[point_idx].detach(&name).ok_or_else(|| {
1291 format!("hook '{}' not found at point '{}'", name, attach_point)
1292 })?;
1293 let priority = old.priority;
1294 let mgr = match self.hook_manager.as_ref() {
1295 Some(m) => m,
1296 None => {
1297 self.hook_slots[point_idx].attach(old);
1298 return Err("hook manager not available".to_string());
1299 }
1300 };
1301 match mgr.load_builtin(name.clone(), builtin_id.as_str(), priority) {
1302 Ok(program) => {
1303 self.hook_slots[point_idx].attach(program);
1304 log::info!(
1305 "Reloaded built-in hook '{}' ({}) at point {} (priority {})",
1306 name,
1307 builtin_id,
1308 attach_point,
1309 priority
1310 );
1311 Ok(())
1312 }
1313 Err(e) => {
1314 self.hook_slots[point_idx].attach(old);
1315 Err(format!("load error: {}", e))
1316 }
1317 }
1318 })();
1319 let _ = response_tx.send(result);
1320 }
1321 #[cfg(not(feature = "hooks"))]
1322 {
1323 let _ = (name, attach_point, builtin_id);
1324 let _ = response_tx.send(Err("hooks not enabled".to_string()));
1325 }
1326 }
1327 Event::SetHookEnabled {
1328 name,
1329 attach_point,
1330 enabled,
1331 response_tx,
1332 } => {
1333 #[cfg(feature = "hooks")]
1334 {
1335 let result = self.update_hook_program(&name, &attach_point, |program| {
1336 program.enabled = enabled;
1337 });
1338 if result.is_ok() {
1339 log::info!(
1340 "{} hook '{}' at point {}",
1341 if enabled { "Enabled" } else { "Disabled" },
1342 name,
1343 attach_point,
1344 );
1345 }
1346 let _ = response_tx.send(result);
1347 }
1348 #[cfg(not(feature = "hooks"))]
1349 {
1350 let _ = (name, attach_point, enabled);
1351 let _ = response_tx.send(Err("hooks not enabled".to_string()));
1352 }
1353 }
1354 Event::SetHookPriority {
1355 name,
1356 attach_point,
1357 priority,
1358 response_tx,
1359 } => {
1360 #[cfg(feature = "hooks")]
1361 {
1362 let result = self.update_hook_program(&name, &attach_point, |program| {
1363 program.priority = priority;
1364 });
1365 if result.is_ok() {
1366 if let Some(point_idx) = crate::config::parse_hook_point(&attach_point)
1367 {
1368 self.hook_slots[point_idx]
1369 .programs
1370 .sort_by(|a, b| b.priority.cmp(&a.priority));
1371 log::info!(
1372 "Updated hook '{}' at point {} to priority {}",
1373 name,
1374 attach_point,
1375 priority,
1376 );
1377 } else {
1378 log::error!(
1379 "hook point '{}' became invalid during priority update",
1380 attach_point
1381 );
1382 }
1383 }
1384 let _ = response_tx.send(result);
1385 }
1386 #[cfg(not(feature = "hooks"))]
1387 {
1388 let _ = (name, attach_point, priority);
1389 let _ = response_tx.send(Err("hooks not enabled".to_string()));
1390 }
1391 }
1392 Event::ListHooks { response_tx } => {
1393 #[cfg(feature = "hooks")]
1394 {
1395 let hook_point_names = [
1396 "PreIngress",
1397 "PreDispatch",
1398 "AnnounceReceived",
1399 "PathUpdated",
1400 "AnnounceRetransmit",
1401 "LinkRequestReceived",
1402 "LinkEstablished",
1403 "LinkClosed",
1404 "InterfaceUp",
1405 "InterfaceDown",
1406 "InterfaceConfigChanged",
1407 "BackbonePeerConnected",
1408 "BackbonePeerDisconnected",
1409 "BackbonePeerIdleTimeout",
1410 "BackbonePeerWriteStall",
1411 "BackbonePeerPenalty",
1412 "SendOnInterface",
1413 "BroadcastOnAllInterfaces",
1414 "DeliverLocal",
1415 "TunnelSynthesize",
1416 "Tick",
1417 ];
1418 let mut infos = Vec::new();
1419 for (idx, slot) in self.hook_slots.iter().enumerate() {
1420 let point_name = hook_point_names.get(idx).unwrap_or(&"Unknown");
1421 for prog in &slot.programs {
1422 infos.push(crate::event::HookInfo {
1423 name: prog.name.clone(),
1424 hook_type: prog.backend_name().to_string(),
1425 attach_point: point_name.to_string(),
1426 priority: prog.priority,
1427 enabled: prog.enabled,
1428 consecutive_traps: prog.consecutive_traps,
1429 });
1430 }
1431 }
1432 let _ = response_tx.send(infos);
1433 }
1434 #[cfg(not(feature = "hooks"))]
1435 {
1436 let _ = response_tx.send(Vec::new());
1437 }
1438 }
1439 Event::InterfaceConfigChanged(id) => {
1440 #[cfg(feature = "hooks")]
1441 {
1442 let ctx = HookContext::Interface { interface_id: id.0 };
1443 let now = time::now();
1444 let engine_ref = EngineRef {
1445 engine: &self.engine,
1446 interfaces: &self.interfaces,
1447 link_manager: &self.link_manager,
1448 now,
1449 };
1450 let provider_events_enabled = self.provider_events_enabled();
1451 if let Some(ref e) = run_hook_inner(
1452 &mut self.hook_slots[HookPoint::InterfaceConfigChanged as usize]
1453 .programs,
1454 &self.hook_manager,
1455 &engine_ref,
1456 &ctx,
1457 now,
1458 provider_events_enabled,
1459 ) {
1460 self.forward_hook_side_effects("InterfaceConfigChanged", e);
1461 }
1462 }
1463 #[cfg(not(feature = "hooks"))]
1464 let _ = id;
1465 }
1466 Event::BackbonePeerConnected {
1467 server_interface_id,
1468 peer_interface_id,
1469 peer_ip,
1470 peer_port,
1471 } => {
1472 #[cfg(feature = "hooks")]
1473 {
1474 self.run_backbone_peer_hook(
1475 "BackbonePeerConnected",
1476 HookPoint::BackbonePeerConnected,
1477 &BackbonePeerHookEvent {
1478 server_interface_id,
1479 peer_interface_id: Some(peer_interface_id),
1480 peer_ip,
1481 peer_port,
1482 connected_for: Duration::ZERO,
1483 had_received_data: false,
1484 penalty_level: 0,
1485 blacklist_for: Duration::ZERO,
1486 },
1487 );
1488 }
1489 #[cfg(not(feature = "hooks"))]
1490 let _ = (server_interface_id, peer_interface_id, peer_ip, peer_port);
1491 }
1492 Event::BackbonePeerDisconnected {
1493 server_interface_id,
1494 peer_interface_id,
1495 peer_ip,
1496 peer_port,
1497 connected_for,
1498 had_received_data,
1499 } => {
1500 #[cfg(feature = "hooks")]
1501 {
1502 self.run_backbone_peer_hook(
1503 "BackbonePeerDisconnected",
1504 HookPoint::BackbonePeerDisconnected,
1505 &BackbonePeerHookEvent {
1506 server_interface_id,
1507 peer_interface_id: Some(peer_interface_id),
1508 peer_ip,
1509 peer_port,
1510 connected_for,
1511 had_received_data,
1512 penalty_level: 0,
1513 blacklist_for: Duration::ZERO,
1514 },
1515 );
1516 }
1517 #[cfg(not(feature = "hooks"))]
1518 let _ = (
1519 server_interface_id,
1520 peer_interface_id,
1521 peer_ip,
1522 peer_port,
1523 connected_for,
1524 had_received_data,
1525 );
1526 }
1527 Event::BackbonePeerIdleTimeout {
1528 server_interface_id,
1529 peer_interface_id,
1530 peer_ip,
1531 peer_port,
1532 connected_for,
1533 } => {
1534 #[cfg(feature = "hooks")]
1535 {
1536 self.run_backbone_peer_hook(
1537 "BackbonePeerIdleTimeout",
1538 HookPoint::BackbonePeerIdleTimeout,
1539 &BackbonePeerHookEvent {
1540 server_interface_id,
1541 peer_interface_id: Some(peer_interface_id),
1542 peer_ip,
1543 peer_port,
1544 connected_for,
1545 had_received_data: false,
1546 penalty_level: 0,
1547 blacklist_for: Duration::ZERO,
1548 },
1549 );
1550 }
1551 #[cfg(not(feature = "hooks"))]
1552 let _ = (
1553 server_interface_id,
1554 peer_interface_id,
1555 peer_ip,
1556 peer_port,
1557 connected_for,
1558 );
1559 }
1560 Event::BackbonePeerWriteStall {
1561 server_interface_id,
1562 peer_interface_id,
1563 peer_ip,
1564 peer_port,
1565 connected_for,
1566 } => {
1567 #[cfg(feature = "hooks")]
1568 {
1569 self.run_backbone_peer_hook(
1570 "BackbonePeerWriteStall",
1571 HookPoint::BackbonePeerWriteStall,
1572 &BackbonePeerHookEvent {
1573 server_interface_id,
1574 peer_interface_id: Some(peer_interface_id),
1575 peer_ip,
1576 peer_port,
1577 connected_for,
1578 had_received_data: false,
1579 penalty_level: 0,
1580 blacklist_for: Duration::ZERO,
1581 },
1582 );
1583 }
1584 #[cfg(not(feature = "hooks"))]
1585 let _ = (
1586 server_interface_id,
1587 peer_interface_id,
1588 peer_ip,
1589 peer_port,
1590 connected_for,
1591 );
1592 }
1593 Event::BackbonePeerPenalty {
1594 server_interface_id,
1595 peer_ip,
1596 penalty_level,
1597 blacklist_for,
1598 } => {
1599 #[cfg(feature = "hooks")]
1600 {
1601 self.run_backbone_peer_hook(
1602 "BackbonePeerPenalty",
1603 HookPoint::BackbonePeerPenalty,
1604 &BackbonePeerHookEvent {
1605 server_interface_id,
1606 peer_interface_id: None,
1607 peer_ip,
1608 peer_port: 0,
1609 connected_for: Duration::ZERO,
1610 had_received_data: false,
1611 penalty_level,
1612 blacklist_for,
1613 },
1614 );
1615 }
1616 #[cfg(not(feature = "hooks"))]
1617 let _ = (server_interface_id, peer_ip, penalty_level, blacklist_for);
1618 }
1619 Event::Shutdown => {
1620 self.graceful_shutdown();
1621 break;
1622 }
1623 }
1624 }
1625 }
1626 pub(crate) fn handle_tunnel_synth_delivery(&mut self, raw: &[u8]) {
1627 let packet = match RawPacket::unpack(raw) {
1629 Ok(p) => p,
1630 Err(_) => return,
1631 };
1632
1633 match rns_core::transport::tunnel::validate_tunnel_synthesize_data(&packet.data) {
1634 Ok(validated) => {
1635 let iface_id = self
1638 .interfaces
1639 .iter()
1640 .find(|(_, entry)| entry.info.wants_tunnel && entry.online && entry.enabled)
1641 .map(|(id, _)| *id);
1642
1643 if let Some(iface) = iface_id {
1644 let now = time::now();
1645 let tunnel_actions = self.engine.handle_tunnel(validated.tunnel_id, iface, now);
1646 self.dispatch_all(tunnel_actions);
1647 }
1648 }
1649 Err(e) => {
1650 log::debug!("Tunnel synthesis validation failed: {}", e);
1651 }
1652 }
1653 }
1654
1655 pub(crate) fn synthesize_tunnel_for_interface(&mut self, interface: InterfaceId) {
1659 if let Some(ref identity) = self.transport_identity {
1660 let actions = self
1661 .engine
1662 .synthesize_tunnel(identity, interface, &mut self.rng);
1663 self.dispatch_all(actions);
1664 }
1665 }
1666
1667 pub(crate) fn handle_request_path(&mut self, dest_hash: [u8; 16]) {
1669 let mut data = Vec::with_capacity(48);
1671 data.extend_from_slice(&dest_hash);
1672
1673 if self.engine.transport_enabled() {
1674 if let Some(id_hash) = self.engine.identity_hash() {
1675 data.extend_from_slice(id_hash);
1676 }
1677 }
1678
1679 let mut tag = [0u8; 16];
1681 self.rng.fill_bytes(&mut tag);
1682 data.extend_from_slice(&tag);
1683
1684 let flags = rns_core::packet::PacketFlags {
1686 header_type: rns_core::constants::HEADER_1,
1687 context_flag: rns_core::constants::FLAG_UNSET,
1688 transport_type: rns_core::constants::TRANSPORT_BROADCAST,
1689 destination_type: rns_core::constants::DESTINATION_PLAIN,
1690 packet_type: rns_core::constants::PACKET_TYPE_DATA,
1691 };
1692
1693 if let Ok(packet) = RawPacket::pack(
1694 flags,
1695 0,
1696 &self.path_request_dest,
1697 None,
1698 rns_core::constants::CONTEXT_NONE,
1699 &data,
1700 ) {
1701 let actions = self.engine.handle_outbound(
1702 &packet,
1703 rns_core::constants::DESTINATION_PLAIN,
1704 None,
1705 time::now(),
1706 );
1707 self.dispatch_all(actions);
1708 }
1709 }
1710
1711 pub(crate) fn get_event_sender(&self) -> crate::event::EventSender {
1714 self.event_tx.clone()
1718 }
1719
1720 const MANAGEMENT_ANNOUNCE_DELAY: f64 = 5.0;
1722
1723 pub(crate) fn tick_discovery_announcer(&mut self, now: f64) {
1725 let announcer = match self.interface_announcer.as_mut() {
1726 Some(a) => a,
1727 None => return,
1728 };
1729
1730 announcer.maybe_start(now);
1731
1732 let stamp_result = match announcer.poll_ready() {
1733 Some(r) => r,
1734 None => return,
1735 };
1736
1737 if !announcer.contains_interface(&stamp_result.interface_name) {
1738 log::debug!(
1739 "Discovery: dropping completed stamp for removed interface '{}'",
1740 stamp_result.interface_name
1741 );
1742 return;
1743 }
1744
1745 let identity = match self.transport_identity.as_ref() {
1746 Some(id) => id,
1747 None => {
1748 log::warn!("Discovery: stamp ready but no transport identity");
1749 return;
1750 }
1751 };
1752
1753 let identity_hash = identity.hash();
1755 let disc_dest = rns_core::destination::destination_hash(
1756 crate::discovery::APP_NAME,
1757 &["discovery", "interface"],
1758 Some(&identity_hash),
1759 );
1760 let name_hash = self.discovery_name_hash;
1761 let mut random_hash = [0u8; 10];
1762 self.rng.fill_bytes(&mut random_hash);
1763
1764 let (announce_data, _) = match rns_core::announce::AnnounceData::pack(
1765 identity,
1766 &disc_dest,
1767 &name_hash,
1768 &random_hash,
1769 None,
1770 Some(&stamp_result.app_data),
1771 ) {
1772 Ok(v) => v,
1773 Err(e) => {
1774 log::warn!("Discovery: failed to pack announce: {}", e);
1775 return;
1776 }
1777 };
1778
1779 let flags = rns_core::packet::PacketFlags {
1780 header_type: rns_core::constants::HEADER_1,
1781 context_flag: rns_core::constants::FLAG_UNSET,
1782 transport_type: rns_core::constants::TRANSPORT_BROADCAST,
1783 destination_type: rns_core::constants::DESTINATION_SINGLE,
1784 packet_type: rns_core::constants::PACKET_TYPE_ANNOUNCE,
1785 };
1786
1787 let packet = match RawPacket::pack(
1788 flags,
1789 0,
1790 &disc_dest,
1791 None,
1792 rns_core::constants::CONTEXT_NONE,
1793 &announce_data,
1794 ) {
1795 Ok(p) => p,
1796 Err(e) => {
1797 log::warn!("Discovery: failed to pack packet: {}", e);
1798 return;
1799 }
1800 };
1801
1802 let outbound_actions = self.engine.handle_outbound(
1803 &packet,
1804 rns_core::constants::DESTINATION_SINGLE,
1805 None,
1806 now,
1807 );
1808 log::debug!(
1809 "Discovery announce sent for interface '{}' ({} actions, dest={:02x?})",
1810 stamp_result.interface_name,
1811 outbound_actions.len(),
1812 &disc_dest[..4],
1813 );
1814 self.dispatch_all(outbound_actions);
1815 }
1816
1817 pub(crate) fn rss_mb() -> Option<f64> {
1819 let statm = std::fs::read_to_string("/proc/self/statm").ok()?;
1820 let rss_pages: u64 = statm.split_whitespace().nth(1)?.parse().ok()?;
1821 Some(rss_pages as f64 * 4096.0 / (1024.0 * 1024.0))
1822 }
1823
1824 pub(crate) fn parse_proc_kib(contents: &str, key: &str) -> Option<u64> {
1825 contents.lines().find_map(|line| {
1826 let value = line.strip_prefix(key)?;
1827 value.split_whitespace().next()?.parse().ok()
1828 })
1829 }
1830
1831 pub(crate) fn proc_status_mb() -> Option<(f64, f64, f64, f64)> {
1832 let status = std::fs::read_to_string("/proc/self/status").ok()?;
1833 let vm_rss = Self::parse_proc_kib(&status, "VmRSS:")? as f64 / 1024.0;
1834 let vm_hwm = Self::parse_proc_kib(&status, "VmHWM:")? as f64 / 1024.0;
1835 let vm_data = Self::parse_proc_kib(&status, "VmData:")? as f64 / 1024.0;
1836 let vm_swap = Self::parse_proc_kib(&status, "VmSwap:").unwrap_or(0) as f64 / 1024.0;
1837 Some((vm_rss, vm_hwm, vm_data, vm_swap))
1838 }
1839
1840 pub(crate) fn smaps_rollup_mb() -> Option<(f64, f64, f64, f64, f64, f64, f64, f64)> {
1841 let smaps = std::fs::read_to_string("/proc/self/smaps_rollup").ok()?;
1842 let rss_kib = Self::parse_proc_kib(&smaps, "Rss:")?;
1843 let anon_kib = Self::parse_proc_kib(&smaps, "Anonymous:")?;
1844 let shared_clean_kib = Self::parse_proc_kib(&smaps, "Shared_Clean:").unwrap_or(0);
1845 let shared_dirty_kib = Self::parse_proc_kib(&smaps, "Shared_Dirty:").unwrap_or(0);
1846 let private_clean_kib = Self::parse_proc_kib(&smaps, "Private_Clean:").unwrap_or(0);
1847 let private_dirty_kib = Self::parse_proc_kib(&smaps, "Private_Dirty:").unwrap_or(0);
1848 let swap_kib = Self::parse_proc_kib(&smaps, "Swap:").unwrap_or(0);
1849 let file_est_kib = rss_kib.saturating_sub(anon_kib);
1850 Some((
1851 rss_kib as f64 / 1024.0,
1852 anon_kib as f64 / 1024.0,
1853 file_est_kib as f64 / 1024.0,
1854 shared_clean_kib as f64 / 1024.0,
1855 shared_dirty_kib as f64 / 1024.0,
1856 private_clean_kib as f64 / 1024.0,
1857 private_dirty_kib as f64 / 1024.0,
1858 swap_kib as f64 / 1024.0,
1859 ))
1860 }
1861
1862 pub(crate) fn log_memory_stats(&self) {
1864 let rss = Self::rss_mb()
1865 .map(|v| format!("{:.1}", v))
1866 .unwrap_or_else(|| "N/A".into());
1867 let (vm_rss, vm_hwm, vm_data, vm_swap) = Self::proc_status_mb()
1868 .map(|(rss, hwm, data, swap)| {
1869 (
1870 format!("{rss:.1}"),
1871 format!("{hwm:.1}"),
1872 format!("{data:.1}"),
1873 format!("{swap:.1}"),
1874 )
1875 })
1876 .unwrap_or_else(|| ("N/A".into(), "N/A".into(), "N/A".into(), "N/A".into()));
1877 let (
1878 smaps_rss,
1879 smaps_anon,
1880 smaps_file_est,
1881 smaps_shared_clean,
1882 smaps_shared_dirty,
1883 smaps_private_clean,
1884 smaps_private_dirty,
1885 smaps_swap,
1886 ) = Self::smaps_rollup_mb()
1887 .map(
1888 |(
1889 rss,
1890 anon,
1891 file_est,
1892 shared_clean,
1893 shared_dirty,
1894 private_clean,
1895 private_dirty,
1896 swap,
1897 )| {
1898 (
1899 format!("{rss:.1}"),
1900 format!("{anon:.1}"),
1901 format!("{file_est:.1}"),
1902 format!("{shared_clean:.1}"),
1903 format!("{shared_dirty:.1}"),
1904 format!("{private_clean:.1}"),
1905 format!("{private_dirty:.1}"),
1906 format!("{swap:.1}"),
1907 )
1908 },
1909 )
1910 .unwrap_or_else(|| {
1911 (
1912 "N/A".into(),
1913 "N/A".into(),
1914 "N/A".into(),
1915 "N/A".into(),
1916 "N/A".into(),
1917 "N/A".into(),
1918 "N/A".into(),
1919 "N/A".into(),
1920 )
1921 });
1922 log::info!(
1923 "MEMSTATS rss_mb={} vmrss_mb={} vmhwm_mb={} vmdata_mb={} vmswap_mb={} smaps_rss_mb={} smaps_anon_mb={} smaps_file_est_mb={} smaps_shared_clean_mb={} smaps_shared_dirty_mb={} smaps_private_clean_mb={} smaps_private_dirty_mb={} smaps_swap_mb={} known_dest={} known_dest_cap_evict={} path={} path_cap_evict={} announce={} reverse={} link={} held_ann={} hashlist={} sig_cache={} ann_verify_q={} rate_lim={} blackhole={} tunnel={} ann_q_ifaces={} ann_q_nonempty={} ann_q_entries={} ann_q_bytes={} ann_q_iface_drop={} pr_tags={} disc_pr={} sent_pkt={} completed={} local_dest={} shared_ann={} lm_links={} hp_sessions={} proof_strat={}",
1924 rss,
1925 vm_rss,
1926 vm_hwm,
1927 vm_data,
1928 vm_swap,
1929 smaps_rss,
1930 smaps_anon,
1931 smaps_file_est,
1932 smaps_shared_clean,
1933 smaps_shared_dirty,
1934 smaps_private_clean,
1935 smaps_private_dirty,
1936 smaps_swap,
1937 self.known_destinations.len(),
1938 self.known_destinations_cap_evict_count,
1939 self.engine.path_table_count(),
1940 self.engine.path_destination_cap_evict_count(),
1941 self.engine.announce_table_count(),
1942 self.engine.reverse_table_count(),
1943 self.engine.link_table_count(),
1944 self.engine.held_announces_count(),
1945 self.engine.packet_hashlist_len(),
1946 self.engine.announce_sig_cache_len(),
1947 self.announce_verify_queue
1948 .lock()
1949 .map(|queue| queue.len())
1950 .unwrap_or(0),
1951 self.engine.rate_limiter_count(),
1952 self.engine.blackholed_count(),
1953 self.engine.tunnel_count(),
1954 self.engine.announce_queue_count(),
1955 self.engine.nonempty_announce_queue_count(),
1956 self.engine.queued_announce_count(),
1957 self.engine.queued_announce_bytes(),
1958 self.engine.announce_queue_interface_cap_drop_count(),
1959 self.engine.discovery_pr_tags_count(),
1960 self.engine.discovery_path_requests_count(),
1961 self.sent_packets.len(),
1962 self.completed_proofs.len(),
1963 self.local_destinations.len(),
1964 self.shared_announces.len(),
1965 self.link_manager.link_count(),
1966 self.holepunch_manager.session_count(),
1967 self.proof_strategies.len(),
1968 );
1969 }
1970
1971 pub(crate) fn tick_management_announces(&mut self, now: f64) {
1973 if self.transport_identity.is_none() {
1974 return;
1975 }
1976
1977 let uptime = now - self.started;
1978
1979 if !self.initial_announce_sent {
1981 if uptime < Self::MANAGEMENT_ANNOUNCE_DELAY {
1982 return;
1983 }
1984 self.initial_announce_sent = true;
1985 self.emit_management_announces(now);
1986 return;
1987 }
1988
1989 if now - self.last_management_announce >= self.management_announce_interval_secs {
1991 self.emit_management_announces(now);
1992 }
1993 }
1994
1995 pub(crate) fn emit_management_announces(&mut self, now: f64) {
1997 use crate::management;
1998
1999 self.last_management_announce = now;
2000
2001 let identity = match self.transport_identity {
2002 Some(ref id) => id,
2003 None => return,
2004 };
2005
2006 let mgmt_raw = if self.management_config.enable_remote_management {
2008 management::build_management_announce(identity, &mut self.rng)
2009 } else {
2010 None
2011 };
2012
2013 let bh_raw = if self.management_config.publish_blackhole {
2014 management::build_blackhole_announce(identity, &mut self.rng)
2015 } else {
2016 None
2017 };
2018
2019 let probe_raw = if self.probe_responder_hash.is_some() {
2020 management::build_probe_announce(identity, &mut self.rng)
2021 } else {
2022 None
2023 };
2024
2025 if let Some(raw) = mgmt_raw {
2026 if let Ok(packet) = RawPacket::unpack(&raw) {
2027 let actions = self.engine.handle_outbound(
2028 &packet,
2029 rns_core::constants::DESTINATION_SINGLE,
2030 None,
2031 now,
2032 );
2033 self.dispatch_all(actions);
2034 log::debug!("Emitted management destination announce");
2035 }
2036 }
2037
2038 if let Some(raw) = bh_raw {
2039 if let Ok(packet) = RawPacket::unpack(&raw) {
2040 let actions = self.engine.handle_outbound(
2041 &packet,
2042 rns_core::constants::DESTINATION_SINGLE,
2043 None,
2044 now,
2045 );
2046 self.dispatch_all(actions);
2047 log::debug!("Emitted blackhole info announce");
2048 }
2049 }
2050
2051 if let Some(raw) = probe_raw {
2052 if let Ok(packet) = RawPacket::unpack(&raw) {
2053 let actions = self.engine.handle_outbound(
2054 &packet,
2055 rns_core::constants::DESTINATION_SINGLE,
2056 None,
2057 now,
2058 );
2059 self.dispatch_all(actions);
2060 log::debug!("Emitted probe responder announce");
2061 }
2062 }
2063 }
2064
2065 pub(crate) fn handle_management_request(
2067 &mut self,
2068 link_id: [u8; 16],
2069 path_hash: [u8; 16],
2070 data: Vec<u8>,
2071 request_id: [u8; 16],
2072 remote_identity: Option<([u8; 16], [u8; 64])>,
2073 ) {
2074 use crate::management;
2075
2076 let is_restricted = path_hash == management::status_path_hash()
2078 || path_hash == management::path_path_hash();
2079
2080 if is_restricted && !self.management_config.remote_management_allowed.is_empty() {
2081 match remote_identity {
2082 Some((identity_hash, _)) => {
2083 if !self
2084 .management_config
2085 .remote_management_allowed
2086 .contains(&identity_hash)
2087 {
2088 log::debug!("Management request denied: identity not in allowed list");
2089 return;
2090 }
2091 }
2092 None => {
2093 log::debug!("Management request denied: peer not identified");
2094 return;
2095 }
2096 }
2097 }
2098
2099 let response_data = if path_hash == management::status_path_hash() {
2100 {
2101 let views: Vec<&dyn management::InterfaceStatusView> = self
2102 .interfaces
2103 .values()
2104 .map(|e| e as &dyn management::InterfaceStatusView)
2105 .collect();
2106 management::handle_status_request(
2107 &data,
2108 &self.engine,
2109 &views,
2110 self.started,
2111 self.probe_responder_hash,
2112 )
2113 }
2114 } else if path_hash == management::path_path_hash() {
2115 management::handle_path_request(&data, &self.engine)
2116 } else if path_hash == management::list_path_hash() {
2117 management::handle_blackhole_list_request(&self.engine)
2118 } else {
2119 log::warn!("Unknown management path_hash: {:02x?}", &path_hash[..4]);
2120 None
2121 };
2122
2123 if let Some(response) = response_data {
2124 let actions = self.link_manager.send_management_response(
2125 &link_id,
2126 &request_id,
2127 &response,
2128 &mut self.rng,
2129 );
2130 self.dispatch_link_actions(actions);
2131 }
2132 }
2133}