1use super::*;
2
3impl Driver {
4 pub(crate) fn handle_frame_event(&mut self, interface_id: InterfaceId, data: Vec<u8>) {
5 if data.len() > 2 && (data[0] & 0x03) == 0x01 {
6 log::debug!(
7 "Announce:frame from iface {} (len={}, flags=0x{:02x})",
8 interface_id.0,
9 data.len(),
10 data[0]
11 );
12 }
13 if let Some(entry) = self.interfaces.get(&interface_id) {
14 if !entry.enabled || !entry.online {
15 return;
16 }
17 }
18 if let Some(entry) = self.interfaces.get_mut(&interface_id) {
19 entry.stats.rxb += data.len() as u64;
20 entry.stats.rx_packets += 1;
21 }
22
23 let packet = if let Some(entry) = self.interfaces.get(&interface_id) {
24 if let Some(ref ifac_state) = entry.ifac {
25 match ifac::unmask_inbound(&data, ifac_state) {
26 Some(unmasked) => unmasked,
27 None => {
28 log::debug!("[{}] IFAC rejected packet", interface_id.0);
29 return;
30 }
31 }
32 } else {
33 if data.len() > 2 && data[0] & 0x80 == 0x80 {
34 log::debug!(
35 "[{}] dropping packet with IFAC flag on non-IFAC interface",
36 interface_id.0
37 );
38 return;
39 }
40 data
41 }
42 } else {
43 data
44 };
45
46 #[cfg(feature = "hooks")]
47 {
48 let pkt_ctx = rns_hooks::PacketContext {
49 flags: if packet.is_empty() { 0 } else { packet[0] },
50 hops: if packet.len() > 1 { packet[1] } else { 0 },
51 destination_hash: extract_dest_hash(&packet),
52 context: 0,
53 packet_hash: [0; 32],
54 interface_id: interface_id.0,
55 data_offset: 0,
56 data_len: packet.len() as u32,
57 };
58 let ctx = HookContext::Packet {
59 ctx: &pkt_ctx,
60 raw: &packet,
61 };
62 let now = time::now();
63 let engine_ref = EngineRef {
64 engine: &self.engine,
65 interfaces: &self.interfaces,
66 link_manager: &self.link_manager,
67 now,
68 };
69 let provider_events_enabled = self.provider_events_enabled();
70 if let Some(ref e) = run_hook_inner(
71 &mut self.hook_slots[HookPoint::PreIngress as usize].programs,
72 &self.hook_manager,
73 &engine_ref,
74 &ctx,
75 now,
76 provider_events_enabled,
77 ) {
78 self.forward_hook_side_effects("PreIngress", e);
79 if e.hook_result.as_ref().is_some_and(|r| r.is_drop()) {
80 return;
81 }
82 }
83 }
84
85 if packet.len() > 2 && (packet[0] & 0x03) == 0x01 {
86 let now = time::now();
87 if let Some(entry) = self.interfaces.get_mut(&interface_id) {
88 entry.stats.record_incoming_announce(now);
89 }
90 }
91
92 if let Some(entry) = self.interfaces.get(&interface_id) {
93 self.engine
94 .update_interface_freq(interface_id, entry.stats.incoming_announce_freq());
95 }
96
97 let actions = if self.async_announce_verification {
98 let mut announce_queue = self
99 .announce_verify_queue
100 .lock()
101 .unwrap_or_else(|poisoned| poisoned.into_inner());
102 self.engine.handle_inbound_with_announce_queue(
103 &packet,
104 interface_id,
105 time::now(),
106 &mut self.rng,
107 Some(&mut announce_queue),
108 )
109 } else {
110 self.engine
111 .handle_inbound(&packet, interface_id, time::now(), &mut self.rng)
112 };
113
114 #[cfg(feature = "hooks")]
115 {
116 let pkt_ctx = rns_hooks::PacketContext {
117 flags: if packet.is_empty() { 0 } else { packet[0] },
118 hops: if packet.len() > 1 { packet[1] } else { 0 },
119 destination_hash: extract_dest_hash(&packet),
120 context: 0,
121 packet_hash: [0; 32],
122 interface_id: interface_id.0,
123 data_offset: 0,
124 data_len: packet.len() as u32,
125 };
126 let ctx = HookContext::Packet {
127 ctx: &pkt_ctx,
128 raw: &packet,
129 };
130 let now = time::now();
131 let engine_ref = EngineRef {
132 engine: &self.engine,
133 interfaces: &self.interfaces,
134 link_manager: &self.link_manager,
135 now,
136 };
137 let provider_events_enabled = self.provider_events_enabled();
138 if let Some(ref e) = run_hook_inner(
139 &mut self.hook_slots[HookPoint::PreDispatch as usize].programs,
140 &self.hook_manager,
141 &engine_ref,
142 &ctx,
143 now,
144 provider_events_enabled,
145 ) {
146 self.forward_hook_side_effects("PreDispatch", e);
147 }
148 }
149
150 self.dispatch_all(actions);
151 }
152
153 pub(crate) fn handle_announce_verified_event(
154 &mut self,
155 key: rns_core::transport::announce_verify_queue::AnnounceVerifyKey,
156 validated: rns_core::announce::ValidatedAnnounce,
157 sig_cache_key: [u8; 32],
158 ) {
159 let pending = {
160 let mut announce_queue = self
161 .announce_verify_queue
162 .lock()
163 .unwrap_or_else(|poisoned| poisoned.into_inner());
164 announce_queue.complete_success(&key)
165 };
166 if let Some(pending) = pending {
167 let actions = self.engine.complete_verified_announce(
168 pending,
169 validated,
170 sig_cache_key,
171 time::now(),
172 &mut self.rng,
173 );
174 self.dispatch_all(actions);
175 }
176 }
177
178 pub(crate) fn handle_tick_event(&mut self) {
179 #[cfg(feature = "hooks")]
180 {
181 let ctx = HookContext::Tick;
182 let now = time::now();
183 let engine_ref = EngineRef {
184 engine: &self.engine,
185 interfaces: &self.interfaces,
186 link_manager: &self.link_manager,
187 now,
188 };
189 let provider_events_enabled = self.provider_events_enabled();
190 if let Some(ref e) = run_hook_inner(
191 &mut self.hook_slots[HookPoint::Tick as usize].programs,
192 &self.hook_manager,
193 &engine_ref,
194 &ctx,
195 now,
196 provider_events_enabled,
197 ) {
198 self.forward_hook_side_effects("Tick", e);
199 }
200 }
201
202 let now = time::now();
203 for (id, entry) in &self.interfaces {
204 self.engine
205 .update_interface_freq(*id, entry.stats.incoming_announce_freq());
206 }
207 let actions = self.engine.tick(now, &mut self.rng);
208 self.dispatch_all(actions);
209 let link_actions = self.link_manager.tick(&mut self.rng);
210 self.dispatch_link_actions(link_actions);
211 self.enforce_drain_deadline();
212 {
213 let tx = self.get_event_sender();
214 let hp_actions = self.holepunch_manager.tick(&tx);
215 self.dispatch_holepunch_actions(hp_actions);
216 }
217 self.tick_management_announces(now);
218 self.sent_packets
219 .retain(|_, (_, sent_time)| now - *sent_time < 60.0);
220 self.completed_proofs
221 .retain(|_, (_, received)| now - *received < 120.0);
222
223 self.tick_discovery_announcer(now);
224 #[cfg(feature = "iface-backbone")]
225 self.maintain_backbone_peer_pool();
226
227 self.memory_stats_counter += 1;
228 if self.memory_stats_counter >= 300 {
229 self.memory_stats_counter = 0;
230 self.log_memory_stats();
231 }
232
233 if self.discover_interfaces {
234 self.discovery_cleanup_counter += 1;
235 if self.discovery_cleanup_counter >= self.discovery_cleanup_interval_ticks {
236 self.discovery_cleanup_counter = 0;
237 if let Ok(removed) = self.discovered_interfaces.cleanup() {
238 if removed > 0 {
239 log::info!("Discovery cleanup: removed {} stale entries", removed);
240 }
241 }
242 }
243 }
244
245 self.cache_cleanup_counter += 1;
246 if self.cache_cleanup_counter >= self.known_destinations_cleanup_interval_ticks {
247 self.cache_cleanup_counter = 0;
248
249 let active_dests = self.engine.active_destination_hashes();
250 let ttl = self.known_destinations_ttl;
251 let kd_before = self.known_destinations.len();
252 self.known_destinations.retain(|k, state| {
253 active_dests.contains(k)
254 || self.local_destinations.contains_key(k)
255 || state.retained
256 || now - Self::known_destination_relevance_time(state) < ttl
257 });
258 let kd_removed = kd_before - self.known_destinations.len();
259 let kd_evicted = self.enforce_known_destination_cap(false);
260 let rl_removed =
261 self.engine
262 .cull_rate_limiter(&active_dests, now, self.rate_limiter_ttl_secs);
263
264 if kd_removed > 0 || kd_evicted > 0 || rl_removed > 0 {
265 log::info!(
266 "Memory cleanup: removed {} known_destinations, evicted {} known_destinations, {} rate_limiter entries",
267 kd_removed, kd_evicted, rl_removed
268 );
269 }
270 }
271
272 self.announce_cache_cleanup_counter += 1;
273 if self.announce_cache_cleanup_counter >= self.announce_cache_cleanup_interval_ticks {
274 self.announce_cache_cleanup_counter = 0;
275 if self.announce_cache.is_some() && self.cache_cleanup_active_hashes.is_none() {
276 self.cache_cleanup_active_hashes = Some(self.engine.active_packet_hashes());
277 self.cache_cleanup_entries = None;
278 self.cache_cleanup_removed = 0;
279 }
280 }
281
282 if self.cache_cleanup_active_hashes.is_some() {
283 if let Some(ref cache) = self.announce_cache {
284 if self.cache_cleanup_entries.is_none() {
285 match cache.entries() {
286 Ok(entries) => self.cache_cleanup_entries = Some(entries),
287 Err(e) => {
288 log::warn!("Announce cache cleanup failed to open directory: {}", e);
289 self.cache_cleanup_active_hashes = None;
290 self.cache_cleanup_entries = None;
291 }
292 }
293 }
294 }
295
296 if let Some(ref cache) = self.announce_cache {
297 let Some(active_hashes) = self.cache_cleanup_active_hashes.as_ref() else {
298 self.cache_cleanup_entries = None;
299 return;
300 };
301 let entries = match self.cache_cleanup_entries.as_mut() {
302 Some(entries) => entries,
303 None => return,
304 };
305 match cache.clean_batch(
306 active_hashes,
307 entries,
308 self.announce_cache_cleanup_batch_size,
309 ) {
310 Ok((removed, finished)) => {
311 self.cache_cleanup_removed += removed;
312 if finished {
313 if self.cache_cleanup_removed > 0 {
314 log::info!(
315 "Announce cache cleanup complete: removed {} stale files",
316 self.cache_cleanup_removed
317 );
318 }
319 self.cache_cleanup_active_hashes = None;
320 self.cache_cleanup_entries = None;
321 }
322 }
323 Err(e) => {
324 log::warn!("Announce cache cleanup failed: {}", e);
325 self.cache_cleanup_active_hashes = None;
326 self.cache_cleanup_entries = None;
327 }
328 }
329 } else {
330 self.cache_cleanup_active_hashes = None;
331 self.cache_cleanup_entries = None;
332 }
333 }
334 }
335
336 pub(crate) fn handle_interface_up_event(
337 &mut self,
338 id: InterfaceId,
339 new_writer: Option<Box<dyn crate::interface::Writer>>,
340 info: Option<rns_core::transport::types::InterfaceInfo>,
341 ) {
342 let wants_tunnel;
343 let mut replay_shared_announces = false;
344 if let Some(mut info) = info {
345 log::info!("[{}] dynamic interface registered", id.0);
346 wants_tunnel = info.wants_tunnel;
347 let iface_type = infer_interface_type(&info.name);
348 info.started = time::now();
349 self.register_interface_runtime_defaults(&info);
350 self.engine.register_interface(info.clone());
351 if let Some(writer) = new_writer {
352 let (writer, async_writer_metrics) =
353 self.wrap_interface_writer(id, &info.name, writer);
354 self.interfaces.insert(
355 id,
356 InterfaceEntry {
357 id,
358 info,
359 writer,
360 async_writer_metrics: Some(async_writer_metrics),
361 enabled: true,
362 online: true,
363 dynamic: true,
364 ifac: None,
365 stats: InterfaceStats {
366 started: time::now(),
367 ..Default::default()
368 },
369 interface_type: iface_type,
370 send_retry_at: None,
371 send_retry_backoff: Duration::ZERO,
372 },
373 );
374 }
375 self.callbacks.on_interface_up(id);
376 #[cfg(feature = "hooks")]
377 {
378 let ctx = HookContext::Interface { interface_id: id.0 };
379 let now = time::now();
380 let engine_ref = EngineRef {
381 engine: &self.engine,
382 interfaces: &self.interfaces,
383 link_manager: &self.link_manager,
384 now,
385 };
386 let provider_events_enabled = self.provider_events_enabled();
387 if let Some(ref e) = run_hook_inner(
388 &mut self.hook_slots[HookPoint::InterfaceUp as usize].programs,
389 &self.hook_manager,
390 &engine_ref,
391 &ctx,
392 now,
393 provider_events_enabled,
394 ) {
395 self.forward_hook_side_effects("InterfaceUp", e);
396 }
397 }
398 } else {
399 let is_local_client = self
400 .interfaces
401 .get(&id)
402 .map(|entry| entry.info.is_local_client)
403 .unwrap_or(false);
404 replay_shared_announces =
405 is_local_client && self.shared_reconnect_pending.remove(&id).unwrap_or(false);
406 let interface_name = self
407 .interfaces
408 .get(&id)
409 .map(|entry| entry.info.name.clone())
410 .unwrap_or_else(|| format!("iface-{}", id.0));
411 let wrapped_writer =
412 new_writer.map(|writer| self.wrap_interface_writer(id, &interface_name, writer));
413 if let Some(entry) = self.interfaces.get_mut(&id) {
414 log::info!("[{}] interface online", id.0);
415 wants_tunnel = entry.info.wants_tunnel;
416 entry.online = true;
417 if let Some((writer, async_writer_metrics)) = wrapped_writer {
418 log::info!("[{}] writer refreshed after reconnect", id.0);
419 entry.writer = writer;
420 entry.async_writer_metrics = Some(async_writer_metrics);
421 }
422 self.callbacks.on_interface_up(id);
423 #[cfg(feature = "hooks")]
424 {
425 let ctx = HookContext::Interface { interface_id: id.0 };
426 let now = time::now();
427 let engine_ref = EngineRef {
428 engine: &self.engine,
429 interfaces: &self.interfaces,
430 link_manager: &self.link_manager,
431 now,
432 };
433 let provider_events_enabled = self.provider_events_enabled();
434 if let Some(ref e) = run_hook_inner(
435 &mut self.hook_slots[HookPoint::InterfaceUp as usize].programs,
436 &self.hook_manager,
437 &engine_ref,
438 &ctx,
439 now,
440 provider_events_enabled,
441 ) {
442 self.forward_hook_side_effects("InterfaceUp", e);
443 }
444 }
445 } else {
446 wants_tunnel = false;
447 }
448 }
449
450 if wants_tunnel {
451 self.synthesize_tunnel_for_interface(id);
452 }
453 if replay_shared_announces {
454 self.replay_shared_announces();
455 }
456 }
457
458 pub(crate) fn handle_interface_down_event(&mut self, id: InterfaceId) {
459 if let Some(entry) = self.interfaces.get(&id) {
460 if let Some(tunnel_id) = entry.info.tunnel_id {
461 self.engine.void_tunnel_interface(&tunnel_id);
462 }
463 }
464
465 if let Some(entry) = self.interfaces.get(&id) {
466 let is_dynamic = entry.dynamic;
467 let is_local_client = entry.info.is_local_client;
468 let interface_name = entry.info.name.clone();
469 if is_dynamic {
470 log::info!("[{}] dynamic interface removed", id.0);
471 self.interface_runtime_defaults.remove(&interface_name);
472 self.engine.deregister_interface(id);
473 self.interfaces.remove(&id);
474 } else {
475 log::info!("[{}] interface offline", id.0);
476 if let Some(entry) = self.interfaces.get_mut(&id) {
477 entry.online = false;
478 } else {
479 log::warn!(
480 "interface {} disappeared while handling interface-down",
481 id.0
482 );
483 return;
484 }
485 if is_local_client {
486 self.handle_shared_interface_down(id);
487 }
488 }
489 self.callbacks.on_interface_down(id);
490 #[cfg(feature = "hooks")]
491 {
492 let ctx = HookContext::Interface { interface_id: id.0 };
493 let now = time::now();
494 let engine_ref = EngineRef {
495 engine: &self.engine,
496 interfaces: &self.interfaces,
497 link_manager: &self.link_manager,
498 now,
499 };
500 let provider_events_enabled = self.provider_events_enabled();
501 if let Some(ref e) = run_hook_inner(
502 &mut self.hook_slots[HookPoint::InterfaceDown as usize].programs,
503 &self.hook_manager,
504 &engine_ref,
505 &ctx,
506 now,
507 provider_events_enabled,
508 ) {
509 self.forward_hook_side_effects("InterfaceDown", e);
510 }
511 }
512 }
513 #[cfg(feature = "iface-backbone")]
514 self.handle_backbone_peer_pool_down(id);
515 }
516
517 pub(crate) fn known_destination_route_hint(
518 &self,
519 dest_hash: &[u8; 16],
520 ) -> Option<(InterfaceId, u8)> {
521 let announced = &self.known_destinations.get(dest_hash)?.announced;
522 let iface = announced.receiving_interface;
523 if iface.0 == 0 {
524 return None;
525 }
526
527 self.interfaces
528 .get(&iface)
529 .filter(|entry| entry.online)
530 .map(|_| (iface, announced.hops))
531 }
532
533 pub(crate) fn handle_send_outbound_event(
534 &mut self,
535 raw: Vec<u8>,
536 dest_type: u8,
537 attached_interface: Option<InterfaceId>,
538 ) {
539 if self.is_draining() {
540 self.reject_new_work("send outbound packet");
541 return;
542 }
543 match RawPacket::unpack(&raw) {
544 Ok(packet) => {
545 let is_announce =
546 packet.flags.packet_type == rns_core::constants::PACKET_TYPE_ANNOUNCE;
547 if is_announce {
548 log::debug!(
549 "SendOutbound: ANNOUNCE for {:02x?} (len={}, dest_type={}, attached={:?})",
550 &packet.destination_hash[..4],
551 raw.len(),
552 dest_type,
553 attached_interface
554 );
555 }
556 if packet.flags.packet_type == rns_core::constants::PACKET_TYPE_DATA {
557 self.sent_packets
558 .insert(packet.packet_hash, (packet.destination_hash, time::now()));
559 }
560 let actions = self.engine.handle_outbound(
561 &packet,
562 dest_type,
563 attached_interface,
564 time::now(),
565 );
566 if is_announce {
567 log::debug!(
568 "SendOutbound: announce routed to {} actions: {:?}",
569 actions.len(),
570 actions
571 .iter()
572 .map(|a| match a {
573 TransportAction::SendOnInterface { interface, .. } =>
574 format!("SendOn({})", interface.0),
575 TransportAction::BroadcastOnAllInterfaces { .. } =>
576 "BroadcastAll".to_string(),
577 _ => "other".to_string(),
578 })
579 .collect::<Vec<_>>()
580 );
581 }
582 self.dispatch_all(actions);
583 }
584 Err(e) => {
585 log::warn!("SendOutbound: failed to unpack packet: {:?}", e);
586 }
587 }
588 }
589
590 pub fn run(&mut self) {
592 loop {
593 let event = match self.rx.recv() {
594 Ok(e) => e,
595 Err(_) => break, };
597
598 match event {
599 Event::Frame { interface_id, data } => {
600 self.handle_frame_event(interface_id, data);
601 }
602 Event::AnnounceVerified {
603 key,
604 validated,
605 sig_cache_key,
606 } => {
607 self.handle_announce_verified_event(key, validated, sig_cache_key);
608 }
609 Event::AnnounceVerifyFailed { key, .. } => {
610 let mut announce_queue = self
611 .announce_verify_queue
612 .lock()
613 .unwrap_or_else(|poisoned| poisoned.into_inner());
614 let _ = announce_queue.complete_failure(&key);
615 }
616 Event::Tick => self.handle_tick_event(),
617 Event::BeginDrain { timeout } => {
618 self.begin_drain(timeout);
619 }
620 Event::InterfaceUp(id, new_writer, info) => {
621 self.handle_interface_up_event(id, new_writer, info);
622 }
623 Event::InterfaceDown(id) => self.handle_interface_down_event(id),
624 Event::SendOutbound {
625 raw,
626 dest_type,
627 attached_interface,
628 } => self.handle_send_outbound_event(raw, dest_type, attached_interface),
629 Event::RegisterDestination {
630 dest_hash,
631 dest_type,
632 } => {
633 self.engine.register_destination(dest_hash, dest_type);
634 self.local_destinations.insert(dest_hash, dest_type);
635 }
636 Event::StoreSharedAnnounce {
637 dest_hash,
638 name_hash,
639 identity_prv_key,
640 app_data,
641 } => {
642 self.shared_announces.insert(
643 dest_hash,
644 SharedAnnounceRecord {
645 name_hash,
646 identity_prv_key,
647 app_data,
648 },
649 );
650 }
651 Event::DeregisterDestination { dest_hash } => {
652 self.engine.deregister_destination(&dest_hash);
653 self.local_destinations.remove(&dest_hash);
654 self.shared_announces.remove(&dest_hash);
655 }
656 Event::Query(request, response_tx) => {
657 let response = self.handle_query_mut(request);
658 let _ = response_tx.send(response);
659 }
660 Event::DeregisterLinkDestination { dest_hash } => {
661 self.link_manager.deregister_link_destination(&dest_hash);
662 }
663 Event::RegisterLinkDestination {
664 dest_hash,
665 sig_prv_bytes,
666 sig_pub_bytes,
667 resource_strategy,
668 } => {
669 let sig_prv =
670 rns_crypto::ed25519::Ed25519PrivateKey::from_bytes(&sig_prv_bytes);
671 let strat = match resource_strategy {
672 1 => crate::link_manager::ResourceStrategy::AcceptAll,
673 2 => crate::link_manager::ResourceStrategy::AcceptApp,
674 _ => crate::link_manager::ResourceStrategy::AcceptNone,
675 };
676 self.link_manager.register_link_destination(
677 dest_hash,
678 sig_prv,
679 sig_pub_bytes,
680 strat,
681 );
682 self.engine
684 .register_destination(dest_hash, rns_core::constants::DESTINATION_SINGLE);
685 self.local_destinations
686 .insert(dest_hash, rns_core::constants::DESTINATION_SINGLE);
687 }
688 Event::RegisterRequestHandler {
689 path,
690 allowed_list,
691 handler,
692 } => {
693 self.link_manager.register_request_handler(
694 &path,
695 allowed_list,
696 move |link_id, p, data, remote| handler(link_id, p, data, remote),
697 );
698 }
699 Event::RegisterRequestHandlerResponse {
700 path,
701 allowed_list,
702 handler,
703 } => {
704 self.link_manager.register_request_handler_response(
705 &path,
706 allowed_list,
707 move |link_id, p, data, remote| handler(link_id, p, data, remote),
708 );
709 }
710 Event::CreateLink {
711 dest_hash,
712 dest_sig_pub_bytes,
713 response_tx,
714 } => {
715 if self.is_draining() {
716 self.reject_new_work("create link");
717 let _ = (dest_hash, dest_sig_pub_bytes);
718 let _ = response_tx.send([0u8; 16]);
719 continue;
720 }
721 let next_hop_interface = self.engine.next_hop_interface(&dest_hash);
722 let recalled_route_hint = if next_hop_interface.is_none() {
723 self.known_destination_route_hint(&dest_hash)
724 } else {
725 None
726 };
727 if recalled_route_hint.is_some() {
728 let _ = self.mark_known_destination_used(&dest_hash);
729 }
730 let attached_interface =
731 next_hop_interface.or(recalled_route_hint.map(|(iface, _)| iface));
732 let hops = self
733 .engine
734 .hops_to(&dest_hash)
735 .or_else(|| recalled_route_hint.map(|(_, hops)| hops))
736 .unwrap_or(0);
737 let mtu = attached_interface
738 .and_then(|iface_id| self.interfaces.get(&iface_id))
739 .map(|entry| entry.info.mtu)
740 .unwrap_or(rns_core::constants::MTU as u32);
741 let (link_id, mut link_actions) = self.link_manager.create_link(
742 &dest_hash,
743 &dest_sig_pub_bytes,
744 hops,
745 mtu,
746 &mut self.rng,
747 );
748 if let Some(iface) = attached_interface {
749 self.link_manager.set_link_route_hint(&link_id, iface, None);
750 }
751 if next_hop_interface.is_none() {
752 if let Some(iface) = attached_interface {
753 for action in &mut link_actions {
754 if let LinkManagerAction::SendPacket {
755 dest_type,
756 attached_interface,
757 ..
758 } = action
759 {
760 if *dest_type == rns_core::constants::DESTINATION_LINK
761 && attached_interface.is_none()
762 {
763 *attached_interface = Some(iface);
764 }
765 }
766 }
767 }
768 }
769 let _ = response_tx.send(link_id);
770 self.dispatch_link_actions(link_actions);
771 }
772 Event::SendRequest {
773 link_id,
774 path,
775 data,
776 } => {
777 if self.is_draining() {
778 self.reject_new_work("send link request");
779 let _ = (link_id, path, data);
780 continue;
781 }
782 let link_actions =
783 self.link_manager
784 .send_request(&link_id, &path, &data, &mut self.rng);
785 self.dispatch_link_actions(link_actions);
786 }
787 Event::IdentifyOnLink {
788 link_id,
789 identity_prv_key,
790 } => {
791 if self.is_draining() {
792 self.reject_new_work("identify on link");
793 let _ = (link_id, identity_prv_key);
794 continue;
795 }
796 let identity =
797 rns_crypto::identity::Identity::from_private_key(&identity_prv_key);
798 let link_actions =
799 self.link_manager
800 .identify(&link_id, &identity, &mut self.rng);
801 self.dispatch_link_actions(link_actions);
802 }
803 Event::TeardownLink { link_id } => {
804 let link_actions = self.link_manager.teardown_link(&link_id);
805 self.dispatch_link_actions(link_actions);
806 }
807 Event::SendResource {
808 link_id,
809 data,
810 metadata,
811 auto_compress,
812 } => {
813 if self.is_draining() {
814 self.reject_new_work("send resource");
815 let _ = (link_id, data, metadata, auto_compress);
816 continue;
817 }
818 let link_actions = self.link_manager.send_resource_with_auto_compress(
819 &link_id,
820 &data,
821 metadata.as_deref(),
822 auto_compress,
823 &mut self.rng,
824 );
825 self.dispatch_link_actions(link_actions);
826 }
827 Event::SetResourceStrategy { link_id, strategy } => {
828 use crate::link_manager::ResourceStrategy;
829 let strat = match strategy {
830 0 => ResourceStrategy::AcceptNone,
831 1 => ResourceStrategy::AcceptAll,
832 2 => ResourceStrategy::AcceptApp,
833 _ => ResourceStrategy::AcceptNone,
834 };
835 self.link_manager.set_resource_strategy(&link_id, strat);
836 }
837 Event::AcceptResource {
838 link_id,
839 resource_hash,
840 accept,
841 } => {
842 if self.is_draining() && accept {
843 self.reject_new_work("accept resource");
844 let _ = (link_id, resource_hash, accept);
845 continue;
846 }
847 let link_actions = self.link_manager.accept_resource(
848 &link_id,
849 &resource_hash,
850 accept,
851 &mut self.rng,
852 );
853 self.dispatch_link_actions(link_actions);
854 }
855 Event::SendChannelMessage {
856 link_id,
857 msgtype,
858 payload,
859 response_tx,
860 } => {
861 if self.is_draining() {
862 self.reject_new_work("send channel message");
863 let _ = response_tx.send(Err(self.drain_error("send channel message")));
864 continue;
865 }
866 match self.link_manager.send_channel_message(
867 &link_id,
868 msgtype,
869 &payload,
870 &mut self.rng,
871 ) {
872 Ok(link_actions) => {
873 self.dispatch_link_actions(link_actions);
874 let _ = response_tx.send(Ok(()));
875 }
876 Err(err) => {
877 let _ = response_tx.send(Err(err));
878 }
879 }
880 }
881 Event::SendOnLink {
882 link_id,
883 data,
884 context,
885 } => {
886 if self.is_draining() {
887 self.reject_new_work("send link payload");
888 let _ = (link_id, data, context);
889 continue;
890 }
891 let link_actions =
892 self.link_manager
893 .send_on_link(&link_id, &data, context, &mut self.rng);
894 self.dispatch_link_actions(link_actions);
895 }
896 Event::RequestPath { dest_hash } => {
897 if self.is_draining() {
898 self.reject_new_work("request path");
899 let _ = dest_hash;
900 continue;
901 }
902 self.handle_request_path(dest_hash);
903 }
904 Event::RegisterProofStrategy {
905 dest_hash,
906 strategy,
907 signing_key,
908 } => {
909 let identity = signing_key
910 .map(|key| rns_crypto::identity::Identity::from_private_key(&key));
911 self.proof_strategies
912 .insert(dest_hash, (strategy, identity));
913 }
914 Event::ProposeDirectConnect { link_id } => {
915 if self.is_draining() {
916 self.reject_new_work("propose direct connect");
917 let _ = link_id;
918 continue;
919 }
920 let derived_key = self.link_manager.get_derived_key(&link_id);
921 if let Some(dk) = derived_key {
922 let tx = self.get_event_sender();
923 let hp_actions =
924 self.holepunch_manager
925 .propose(link_id, &dk, &mut self.rng, &tx);
926 self.dispatch_holepunch_actions(hp_actions);
927 } else {
928 log::warn!(
929 "Cannot propose direct connect: no derived key for link {:02x?}",
930 &link_id[..4]
931 );
932 }
933 }
934 Event::SetDirectConnectPolicy { policy } => {
935 self.holepunch_manager.set_policy(policy);
936 }
937 Event::HolePunchProbeResult {
938 link_id,
939 session_id,
940 observed_addr,
941 socket,
942 probe_server,
943 } => {
944 let hp_actions = self.holepunch_manager.handle_probe_result(
945 link_id,
946 session_id,
947 observed_addr,
948 socket,
949 probe_server,
950 );
951 self.dispatch_holepunch_actions(hp_actions);
952 }
953 Event::HolePunchProbeFailed {
954 link_id,
955 session_id,
956 } => {
957 let hp_actions = self
958 .holepunch_manager
959 .handle_probe_failed(link_id, session_id);
960 self.dispatch_holepunch_actions(hp_actions);
961 }
962 Event::LoadHook {
963 name,
964 wasm_bytes,
965 attach_point,
966 priority,
967 response_tx,
968 } => {
969 #[cfg(feature = "hooks")]
970 {
971 let result = (|| -> Result<(), String> {
972 let point_idx = crate::config::parse_hook_point(&attach_point)
973 .ok_or_else(|| format!("unknown hook point '{}'", attach_point))?;
974 let mgr = self
975 .hook_manager
976 .as_ref()
977 .ok_or_else(|| "hook manager not available".to_string())?;
978 let program = mgr
979 .compile(name.clone(), &wasm_bytes, priority)
980 .map_err(|e| format!("compile error: {}", e))?;
981 self.hook_slots[point_idx].attach(program);
982 log::info!(
983 "Loaded hook '{}' at point {} (priority {})",
984 name,
985 attach_point,
986 priority
987 );
988 Ok(())
989 })();
990 let _ = response_tx.send(result);
991 }
992 #[cfg(not(feature = "hooks"))]
993 {
994 let _ = (name, wasm_bytes, attach_point, priority);
995 let _ = response_tx.send(Err("hooks not enabled".to_string()));
996 }
997 }
998 Event::LoadHookFile {
999 name,
1000 path,
1001 hook_type,
1002 attach_point,
1003 priority,
1004 response_tx,
1005 } => {
1006 #[cfg(feature = "hooks")]
1007 {
1008 let result = (|| -> Result<(), String> {
1009 let point_idx = crate::config::parse_hook_point(&attach_point)
1010 .ok_or_else(|| format!("unknown hook point '{}'", attach_point))?;
1011 let backend = crate::config::parse_hook_backend(&hook_type)?;
1012 let mgr = self
1013 .hook_manager
1014 .as_ref()
1015 .ok_or_else(|| "hook manager not available".to_string())?;
1016 let program = mgr
1017 .load_file_backend(
1018 name.clone(),
1019 std::path::Path::new(&path),
1020 priority,
1021 backend,
1022 )
1023 .map_err(|e| format!("load error: {}", e))?;
1024 self.hook_slots[point_idx].attach(program);
1025 log::info!(
1026 "Loaded {} hook '{}' at point {} (priority {})",
1027 backend.as_str(),
1028 name,
1029 attach_point,
1030 priority
1031 );
1032 Ok(())
1033 })();
1034 let _ = response_tx.send(result);
1035 }
1036 #[cfg(not(feature = "hooks"))]
1037 {
1038 let _ = (name, path, hook_type, attach_point, priority);
1039 let _ = response_tx.send(Err("hooks not enabled".to_string()));
1040 }
1041 }
1042 Event::LoadBuiltinHook {
1043 name,
1044 builtin_id,
1045 attach_point,
1046 priority,
1047 response_tx,
1048 } => {
1049 #[cfg(feature = "hooks")]
1050 {
1051 let result = (|| -> Result<(), String> {
1052 let point_idx = crate::config::parse_hook_point(&attach_point)
1053 .ok_or_else(|| format!("unknown hook point '{}'", attach_point))?;
1054 let mgr = self
1055 .hook_manager
1056 .as_ref()
1057 .ok_or_else(|| "hook manager not available".to_string())?;
1058 let program = mgr
1059 .load_builtin(name.clone(), builtin_id.as_str(), priority)
1060 .map_err(|e| format!("load error: {}", e))?;
1061 self.hook_slots[point_idx].attach(program);
1062 log::info!(
1063 "Loaded built-in hook '{}' ({}) at point {} (priority {})",
1064 name,
1065 builtin_id,
1066 attach_point,
1067 priority
1068 );
1069 Ok(())
1070 })();
1071 let _ = response_tx.send(result);
1072 }
1073 #[cfg(not(feature = "hooks"))]
1074 {
1075 let _ = (name, builtin_id, attach_point, priority);
1076 let _ = response_tx.send(Err("hooks not enabled".to_string()));
1077 }
1078 }
1079 Event::UnloadHook {
1080 name,
1081 attach_point,
1082 response_tx,
1083 } => {
1084 #[cfg(feature = "hooks")]
1085 {
1086 let result = (|| -> Result<(), String> {
1087 let point_idx = crate::config::parse_hook_point(&attach_point)
1088 .ok_or_else(|| format!("unknown hook point '{}'", attach_point))?;
1089 match self.hook_slots[point_idx].detach(&name) {
1090 Some(_) => {
1091 log::info!(
1092 "Unloaded hook '{}' from point {}",
1093 name,
1094 attach_point
1095 );
1096 Ok(())
1097 }
1098 None => Err(format!(
1099 "hook '{}' not found at point '{}'",
1100 name, attach_point
1101 )),
1102 }
1103 })();
1104 let _ = response_tx.send(result);
1105 }
1106 #[cfg(not(feature = "hooks"))]
1107 {
1108 let _ = (name, attach_point);
1109 let _ = response_tx.send(Err("hooks not enabled".to_string()));
1110 }
1111 }
1112 Event::ReloadHook {
1113 name,
1114 attach_point,
1115 wasm_bytes,
1116 response_tx,
1117 } => {
1118 #[cfg(feature = "hooks")]
1119 {
1120 let result = (|| -> Result<(), String> {
1121 let point_idx = crate::config::parse_hook_point(&attach_point)
1122 .ok_or_else(|| format!("unknown hook point '{}'", attach_point))?;
1123 let old =
1124 self.hook_slots[point_idx].detach(&name).ok_or_else(|| {
1125 format!("hook '{}' not found at point '{}'", name, attach_point)
1126 })?;
1127 let priority = old.priority;
1128 let mgr = match self.hook_manager.as_ref() {
1129 Some(m) => m,
1130 None => {
1131 self.hook_slots[point_idx].attach(old);
1132 return Err("hook manager not available".to_string());
1133 }
1134 };
1135 match mgr.compile(name.clone(), &wasm_bytes, priority) {
1136 Ok(program) => {
1137 self.hook_slots[point_idx].attach(program);
1138 log::info!(
1139 "Reloaded hook '{}' at point {} (priority {})",
1140 name,
1141 attach_point,
1142 priority
1143 );
1144 Ok(())
1145 }
1146 Err(e) => {
1147 self.hook_slots[point_idx].attach(old);
1148 Err(format!("compile error: {}", e))
1149 }
1150 }
1151 })();
1152 let _ = response_tx.send(result);
1153 }
1154 #[cfg(not(feature = "hooks"))]
1155 {
1156 let _ = (name, attach_point, wasm_bytes);
1157 let _ = response_tx.send(Err("hooks not enabled".to_string()));
1158 }
1159 }
1160 Event::ReloadHookFile {
1161 name,
1162 attach_point,
1163 path,
1164 hook_type,
1165 response_tx,
1166 } => {
1167 #[cfg(feature = "hooks")]
1168 {
1169 let result = (|| -> Result<(), String> {
1170 let point_idx = crate::config::parse_hook_point(&attach_point)
1171 .ok_or_else(|| format!("unknown hook point '{}'", attach_point))?;
1172 let old =
1173 self.hook_slots[point_idx].detach(&name).ok_or_else(|| {
1174 format!("hook '{}' not found at point '{}'", name, attach_point)
1175 })?;
1176 let priority = old.priority;
1177 let backend = match crate::config::parse_hook_backend(&hook_type) {
1178 Ok(backend) => backend,
1179 Err(e) => {
1180 self.hook_slots[point_idx].attach(old);
1181 return Err(e);
1182 }
1183 };
1184 let mgr = match self.hook_manager.as_ref() {
1185 Some(m) => m,
1186 None => {
1187 self.hook_slots[point_idx].attach(old);
1188 return Err("hook manager not available".to_string());
1189 }
1190 };
1191 match mgr.load_file_backend(
1192 name.clone(),
1193 std::path::Path::new(&path),
1194 priority,
1195 backend,
1196 ) {
1197 Ok(program) => {
1198 self.hook_slots[point_idx].attach(program);
1199 log::info!(
1200 "Reloaded {} hook '{}' at point {} (priority {})",
1201 backend.as_str(),
1202 name,
1203 attach_point,
1204 priority
1205 );
1206 Ok(())
1207 }
1208 Err(e) => {
1209 self.hook_slots[point_idx].attach(old);
1210 Err(format!("load error: {}", e))
1211 }
1212 }
1213 })();
1214 let _ = response_tx.send(result);
1215 }
1216 #[cfg(not(feature = "hooks"))]
1217 {
1218 let _ = (name, attach_point, path, hook_type);
1219 let _ = response_tx.send(Err("hooks not enabled".to_string()));
1220 }
1221 }
1222 Event::ReloadBuiltinHook {
1223 name,
1224 attach_point,
1225 builtin_id,
1226 response_tx,
1227 } => {
1228 #[cfg(feature = "hooks")]
1229 {
1230 let result = (|| -> Result<(), String> {
1231 let point_idx = crate::config::parse_hook_point(&attach_point)
1232 .ok_or_else(|| format!("unknown hook point '{}'", attach_point))?;
1233 let old =
1234 self.hook_slots[point_idx].detach(&name).ok_or_else(|| {
1235 format!("hook '{}' not found at point '{}'", name, attach_point)
1236 })?;
1237 let priority = old.priority;
1238 let mgr = match self.hook_manager.as_ref() {
1239 Some(m) => m,
1240 None => {
1241 self.hook_slots[point_idx].attach(old);
1242 return Err("hook manager not available".to_string());
1243 }
1244 };
1245 match mgr.load_builtin(name.clone(), builtin_id.as_str(), priority) {
1246 Ok(program) => {
1247 self.hook_slots[point_idx].attach(program);
1248 log::info!(
1249 "Reloaded built-in hook '{}' ({}) at point {} (priority {})",
1250 name,
1251 builtin_id,
1252 attach_point,
1253 priority
1254 );
1255 Ok(())
1256 }
1257 Err(e) => {
1258 self.hook_slots[point_idx].attach(old);
1259 Err(format!("load error: {}", e))
1260 }
1261 }
1262 })();
1263 let _ = response_tx.send(result);
1264 }
1265 #[cfg(not(feature = "hooks"))]
1266 {
1267 let _ = (name, attach_point, builtin_id);
1268 let _ = response_tx.send(Err("hooks not enabled".to_string()));
1269 }
1270 }
1271 Event::SetHookEnabled {
1272 name,
1273 attach_point,
1274 enabled,
1275 response_tx,
1276 } => {
1277 #[cfg(feature = "hooks")]
1278 {
1279 let result = self.update_hook_program(&name, &attach_point, |program| {
1280 program.enabled = enabled;
1281 });
1282 if result.is_ok() {
1283 log::info!(
1284 "{} hook '{}' at point {}",
1285 if enabled { "Enabled" } else { "Disabled" },
1286 name,
1287 attach_point,
1288 );
1289 }
1290 let _ = response_tx.send(result);
1291 }
1292 #[cfg(not(feature = "hooks"))]
1293 {
1294 let _ = (name, attach_point, enabled);
1295 let _ = response_tx.send(Err("hooks not enabled".to_string()));
1296 }
1297 }
1298 Event::SetHookPriority {
1299 name,
1300 attach_point,
1301 priority,
1302 response_tx,
1303 } => {
1304 #[cfg(feature = "hooks")]
1305 {
1306 let result = self.update_hook_program(&name, &attach_point, |program| {
1307 program.priority = priority;
1308 });
1309 if result.is_ok() {
1310 if let Some(point_idx) = crate::config::parse_hook_point(&attach_point)
1311 {
1312 self.hook_slots[point_idx]
1313 .programs
1314 .sort_by(|a, b| b.priority.cmp(&a.priority));
1315 log::info!(
1316 "Updated hook '{}' at point {} to priority {}",
1317 name,
1318 attach_point,
1319 priority,
1320 );
1321 } else {
1322 log::error!(
1323 "hook point '{}' became invalid during priority update",
1324 attach_point
1325 );
1326 }
1327 }
1328 let _ = response_tx.send(result);
1329 }
1330 #[cfg(not(feature = "hooks"))]
1331 {
1332 let _ = (name, attach_point, priority);
1333 let _ = response_tx.send(Err("hooks not enabled".to_string()));
1334 }
1335 }
1336 Event::ListHooks { response_tx } => {
1337 #[cfg(feature = "hooks")]
1338 {
1339 let hook_point_names = [
1340 "PreIngress",
1341 "PreDispatch",
1342 "AnnounceReceived",
1343 "PathUpdated",
1344 "AnnounceRetransmit",
1345 "LinkRequestReceived",
1346 "LinkEstablished",
1347 "LinkClosed",
1348 "InterfaceUp",
1349 "InterfaceDown",
1350 "InterfaceConfigChanged",
1351 "BackbonePeerConnected",
1352 "BackbonePeerDisconnected",
1353 "BackbonePeerIdleTimeout",
1354 "BackbonePeerWriteStall",
1355 "BackbonePeerPenalty",
1356 "SendOnInterface",
1357 "BroadcastOnAllInterfaces",
1358 "DeliverLocal",
1359 "TunnelSynthesize",
1360 "Tick",
1361 ];
1362 let mut infos = Vec::new();
1363 for (idx, slot) in self.hook_slots.iter().enumerate() {
1364 let point_name = hook_point_names.get(idx).unwrap_or(&"Unknown");
1365 for prog in &slot.programs {
1366 infos.push(crate::event::HookInfo {
1367 name: prog.name.clone(),
1368 hook_type: prog.backend_name().to_string(),
1369 attach_point: point_name.to_string(),
1370 priority: prog.priority,
1371 enabled: prog.enabled,
1372 consecutive_traps: prog.consecutive_traps,
1373 });
1374 }
1375 }
1376 let _ = response_tx.send(infos);
1377 }
1378 #[cfg(not(feature = "hooks"))]
1379 {
1380 let _ = response_tx.send(Vec::new());
1381 }
1382 }
1383 Event::InterfaceConfigChanged(id) => {
1384 #[cfg(feature = "hooks")]
1385 {
1386 let ctx = HookContext::Interface { interface_id: id.0 };
1387 let now = time::now();
1388 let engine_ref = EngineRef {
1389 engine: &self.engine,
1390 interfaces: &self.interfaces,
1391 link_manager: &self.link_manager,
1392 now,
1393 };
1394 let provider_events_enabled = self.provider_events_enabled();
1395 if let Some(ref e) = run_hook_inner(
1396 &mut self.hook_slots[HookPoint::InterfaceConfigChanged as usize]
1397 .programs,
1398 &self.hook_manager,
1399 &engine_ref,
1400 &ctx,
1401 now,
1402 provider_events_enabled,
1403 ) {
1404 self.forward_hook_side_effects("InterfaceConfigChanged", e);
1405 }
1406 }
1407 #[cfg(not(feature = "hooks"))]
1408 let _ = id;
1409 }
1410 Event::BackbonePeerConnected {
1411 server_interface_id,
1412 peer_interface_id,
1413 peer_ip,
1414 peer_port,
1415 } => {
1416 #[cfg(feature = "hooks")]
1417 {
1418 self.run_backbone_peer_hook(
1419 "BackbonePeerConnected",
1420 HookPoint::BackbonePeerConnected,
1421 &BackbonePeerHookEvent {
1422 server_interface_id,
1423 peer_interface_id: Some(peer_interface_id),
1424 peer_ip,
1425 peer_port,
1426 connected_for: Duration::ZERO,
1427 had_received_data: false,
1428 penalty_level: 0,
1429 blacklist_for: Duration::ZERO,
1430 },
1431 );
1432 }
1433 #[cfg(not(feature = "hooks"))]
1434 let _ = (server_interface_id, peer_interface_id, peer_ip, peer_port);
1435 }
1436 Event::BackbonePeerDisconnected {
1437 server_interface_id,
1438 peer_interface_id,
1439 peer_ip,
1440 peer_port,
1441 connected_for,
1442 had_received_data,
1443 } => {
1444 #[cfg(feature = "hooks")]
1445 {
1446 self.run_backbone_peer_hook(
1447 "BackbonePeerDisconnected",
1448 HookPoint::BackbonePeerDisconnected,
1449 &BackbonePeerHookEvent {
1450 server_interface_id,
1451 peer_interface_id: Some(peer_interface_id),
1452 peer_ip,
1453 peer_port,
1454 connected_for,
1455 had_received_data,
1456 penalty_level: 0,
1457 blacklist_for: Duration::ZERO,
1458 },
1459 );
1460 }
1461 #[cfg(not(feature = "hooks"))]
1462 let _ = (
1463 server_interface_id,
1464 peer_interface_id,
1465 peer_ip,
1466 peer_port,
1467 connected_for,
1468 had_received_data,
1469 );
1470 }
1471 Event::BackbonePeerIdleTimeout {
1472 server_interface_id,
1473 peer_interface_id,
1474 peer_ip,
1475 peer_port,
1476 connected_for,
1477 } => {
1478 #[cfg(feature = "hooks")]
1479 {
1480 self.run_backbone_peer_hook(
1481 "BackbonePeerIdleTimeout",
1482 HookPoint::BackbonePeerIdleTimeout,
1483 &BackbonePeerHookEvent {
1484 server_interface_id,
1485 peer_interface_id: Some(peer_interface_id),
1486 peer_ip,
1487 peer_port,
1488 connected_for,
1489 had_received_data: false,
1490 penalty_level: 0,
1491 blacklist_for: Duration::ZERO,
1492 },
1493 );
1494 }
1495 #[cfg(not(feature = "hooks"))]
1496 let _ = (
1497 server_interface_id,
1498 peer_interface_id,
1499 peer_ip,
1500 peer_port,
1501 connected_for,
1502 );
1503 }
1504 Event::BackbonePeerWriteStall {
1505 server_interface_id,
1506 peer_interface_id,
1507 peer_ip,
1508 peer_port,
1509 connected_for,
1510 } => {
1511 #[cfg(feature = "hooks")]
1512 {
1513 self.run_backbone_peer_hook(
1514 "BackbonePeerWriteStall",
1515 HookPoint::BackbonePeerWriteStall,
1516 &BackbonePeerHookEvent {
1517 server_interface_id,
1518 peer_interface_id: Some(peer_interface_id),
1519 peer_ip,
1520 peer_port,
1521 connected_for,
1522 had_received_data: false,
1523 penalty_level: 0,
1524 blacklist_for: Duration::ZERO,
1525 },
1526 );
1527 }
1528 #[cfg(not(feature = "hooks"))]
1529 let _ = (
1530 server_interface_id,
1531 peer_interface_id,
1532 peer_ip,
1533 peer_port,
1534 connected_for,
1535 );
1536 }
1537 Event::BackbonePeerPenalty {
1538 server_interface_id,
1539 peer_ip,
1540 penalty_level,
1541 blacklist_for,
1542 } => {
1543 #[cfg(feature = "hooks")]
1544 {
1545 self.run_backbone_peer_hook(
1546 "BackbonePeerPenalty",
1547 HookPoint::BackbonePeerPenalty,
1548 &BackbonePeerHookEvent {
1549 server_interface_id,
1550 peer_interface_id: None,
1551 peer_ip,
1552 peer_port: 0,
1553 connected_for: Duration::ZERO,
1554 had_received_data: false,
1555 penalty_level,
1556 blacklist_for,
1557 },
1558 );
1559 }
1560 #[cfg(not(feature = "hooks"))]
1561 let _ = (server_interface_id, peer_ip, penalty_level, blacklist_for);
1562 }
1563 Event::Shutdown => {
1564 self.graceful_shutdown();
1565 break;
1566 }
1567 }
1568 }
1569 }
1570 pub(crate) fn handle_tunnel_synth_delivery(&mut self, raw: &[u8]) {
1571 let packet = match RawPacket::unpack(raw) {
1573 Ok(p) => p,
1574 Err(_) => return,
1575 };
1576
1577 match rns_core::transport::tunnel::validate_tunnel_synthesize_data(&packet.data) {
1578 Ok(validated) => {
1579 let iface_id = self
1582 .interfaces
1583 .iter()
1584 .find(|(_, entry)| entry.info.wants_tunnel && entry.online && entry.enabled)
1585 .map(|(id, _)| *id);
1586
1587 if let Some(iface) = iface_id {
1588 let now = time::now();
1589 let tunnel_actions = self.engine.handle_tunnel(validated.tunnel_id, iface, now);
1590 self.dispatch_all(tunnel_actions);
1591 }
1592 }
1593 Err(e) => {
1594 log::debug!("Tunnel synthesis validation failed: {}", e);
1595 }
1596 }
1597 }
1598
1599 pub(crate) fn synthesize_tunnel_for_interface(&mut self, interface: InterfaceId) {
1603 if let Some(ref identity) = self.transport_identity {
1604 let actions = self
1605 .engine
1606 .synthesize_tunnel(identity, interface, &mut self.rng);
1607 self.dispatch_all(actions);
1608 }
1609 }
1610
1611 pub(crate) fn handle_request_path(&mut self, dest_hash: [u8; 16]) {
1613 let mut data = Vec::with_capacity(48);
1615 data.extend_from_slice(&dest_hash);
1616
1617 if self.engine.transport_enabled() {
1618 if let Some(id_hash) = self.engine.identity_hash() {
1619 data.extend_from_slice(id_hash);
1620 }
1621 }
1622
1623 let mut tag = [0u8; 16];
1625 self.rng.fill_bytes(&mut tag);
1626 data.extend_from_slice(&tag);
1627
1628 let flags = rns_core::packet::PacketFlags {
1630 header_type: rns_core::constants::HEADER_1,
1631 context_flag: rns_core::constants::FLAG_UNSET,
1632 transport_type: rns_core::constants::TRANSPORT_BROADCAST,
1633 destination_type: rns_core::constants::DESTINATION_PLAIN,
1634 packet_type: rns_core::constants::PACKET_TYPE_DATA,
1635 };
1636
1637 if let Ok(packet) = RawPacket::pack(
1638 flags,
1639 0,
1640 &self.path_request_dest,
1641 None,
1642 rns_core::constants::CONTEXT_NONE,
1643 &data,
1644 ) {
1645 let actions = self.engine.handle_outbound(
1646 &packet,
1647 rns_core::constants::DESTINATION_PLAIN,
1648 None,
1649 time::now(),
1650 );
1651 self.dispatch_all(actions);
1652 }
1653 }
1654
1655 pub(crate) fn get_event_sender(&self) -> crate::event::EventSender {
1658 self.event_tx.clone()
1662 }
1663
1664 const MANAGEMENT_ANNOUNCE_DELAY: f64 = 5.0;
1666
1667 pub(crate) fn tick_discovery_announcer(&mut self, now: f64) {
1669 let announcer = match self.interface_announcer.as_mut() {
1670 Some(a) => a,
1671 None => return,
1672 };
1673
1674 announcer.maybe_start(now);
1675
1676 let stamp_result = match announcer.poll_ready() {
1677 Some(r) => r,
1678 None => return,
1679 };
1680
1681 if !announcer.contains_interface(&stamp_result.interface_name) {
1682 log::debug!(
1683 "Discovery: dropping completed stamp for removed interface '{}'",
1684 stamp_result.interface_name
1685 );
1686 return;
1687 }
1688
1689 let identity = match self.transport_identity.as_ref() {
1690 Some(id) => id,
1691 None => {
1692 log::warn!("Discovery: stamp ready but no transport identity");
1693 return;
1694 }
1695 };
1696
1697 let identity_hash = identity.hash();
1699 let disc_dest = rns_core::destination::destination_hash(
1700 crate::discovery::APP_NAME,
1701 &["discovery", "interface"],
1702 Some(&identity_hash),
1703 );
1704 let name_hash = self.discovery_name_hash;
1705 let mut random_hash = [0u8; 10];
1706 self.rng.fill_bytes(&mut random_hash);
1707
1708 let (announce_data, _) = match rns_core::announce::AnnounceData::pack(
1709 identity,
1710 &disc_dest,
1711 &name_hash,
1712 &random_hash,
1713 None,
1714 Some(&stamp_result.app_data),
1715 ) {
1716 Ok(v) => v,
1717 Err(e) => {
1718 log::warn!("Discovery: failed to pack announce: {}", e);
1719 return;
1720 }
1721 };
1722
1723 let flags = rns_core::packet::PacketFlags {
1724 header_type: rns_core::constants::HEADER_1,
1725 context_flag: rns_core::constants::FLAG_UNSET,
1726 transport_type: rns_core::constants::TRANSPORT_BROADCAST,
1727 destination_type: rns_core::constants::DESTINATION_SINGLE,
1728 packet_type: rns_core::constants::PACKET_TYPE_ANNOUNCE,
1729 };
1730
1731 let packet = match RawPacket::pack(
1732 flags,
1733 0,
1734 &disc_dest,
1735 None,
1736 rns_core::constants::CONTEXT_NONE,
1737 &announce_data,
1738 ) {
1739 Ok(p) => p,
1740 Err(e) => {
1741 log::warn!("Discovery: failed to pack packet: {}", e);
1742 return;
1743 }
1744 };
1745
1746 let outbound_actions = self.engine.handle_outbound(
1747 &packet,
1748 rns_core::constants::DESTINATION_SINGLE,
1749 None,
1750 now,
1751 );
1752 log::debug!(
1753 "Discovery announce sent for interface '{}' ({} actions, dest={:02x?})",
1754 stamp_result.interface_name,
1755 outbound_actions.len(),
1756 &disc_dest[..4],
1757 );
1758 self.dispatch_all(outbound_actions);
1759 }
1760
1761 pub(crate) fn rss_mb() -> Option<f64> {
1763 let statm = std::fs::read_to_string("/proc/self/statm").ok()?;
1764 let rss_pages: u64 = statm.split_whitespace().nth(1)?.parse().ok()?;
1765 Some(rss_pages as f64 * 4096.0 / (1024.0 * 1024.0))
1766 }
1767
1768 pub(crate) fn parse_proc_kib(contents: &str, key: &str) -> Option<u64> {
1769 contents.lines().find_map(|line| {
1770 let value = line.strip_prefix(key)?;
1771 value.split_whitespace().next()?.parse().ok()
1772 })
1773 }
1774
1775 pub(crate) fn proc_status_mb() -> Option<(f64, f64, f64, f64)> {
1776 let status = std::fs::read_to_string("/proc/self/status").ok()?;
1777 let vm_rss = Self::parse_proc_kib(&status, "VmRSS:")? as f64 / 1024.0;
1778 let vm_hwm = Self::parse_proc_kib(&status, "VmHWM:")? as f64 / 1024.0;
1779 let vm_data = Self::parse_proc_kib(&status, "VmData:")? as f64 / 1024.0;
1780 let vm_swap = Self::parse_proc_kib(&status, "VmSwap:").unwrap_or(0) as f64 / 1024.0;
1781 Some((vm_rss, vm_hwm, vm_data, vm_swap))
1782 }
1783
1784 pub(crate) fn smaps_rollup_mb() -> Option<(f64, f64, f64, f64, f64, f64, f64, f64)> {
1785 let smaps = std::fs::read_to_string("/proc/self/smaps_rollup").ok()?;
1786 let rss_kib = Self::parse_proc_kib(&smaps, "Rss:")?;
1787 let anon_kib = Self::parse_proc_kib(&smaps, "Anonymous:")?;
1788 let shared_clean_kib = Self::parse_proc_kib(&smaps, "Shared_Clean:").unwrap_or(0);
1789 let shared_dirty_kib = Self::parse_proc_kib(&smaps, "Shared_Dirty:").unwrap_or(0);
1790 let private_clean_kib = Self::parse_proc_kib(&smaps, "Private_Clean:").unwrap_or(0);
1791 let private_dirty_kib = Self::parse_proc_kib(&smaps, "Private_Dirty:").unwrap_or(0);
1792 let swap_kib = Self::parse_proc_kib(&smaps, "Swap:").unwrap_or(0);
1793 let file_est_kib = rss_kib.saturating_sub(anon_kib);
1794 Some((
1795 rss_kib as f64 / 1024.0,
1796 anon_kib as f64 / 1024.0,
1797 file_est_kib as f64 / 1024.0,
1798 shared_clean_kib as f64 / 1024.0,
1799 shared_dirty_kib as f64 / 1024.0,
1800 private_clean_kib as f64 / 1024.0,
1801 private_dirty_kib as f64 / 1024.0,
1802 swap_kib as f64 / 1024.0,
1803 ))
1804 }
1805
1806 pub(crate) fn log_memory_stats(&self) {
1808 let rss = Self::rss_mb()
1809 .map(|v| format!("{:.1}", v))
1810 .unwrap_or_else(|| "N/A".into());
1811 let (vm_rss, vm_hwm, vm_data, vm_swap) = Self::proc_status_mb()
1812 .map(|(rss, hwm, data, swap)| {
1813 (
1814 format!("{rss:.1}"),
1815 format!("{hwm:.1}"),
1816 format!("{data:.1}"),
1817 format!("{swap:.1}"),
1818 )
1819 })
1820 .unwrap_or_else(|| ("N/A".into(), "N/A".into(), "N/A".into(), "N/A".into()));
1821 let (
1822 smaps_rss,
1823 smaps_anon,
1824 smaps_file_est,
1825 smaps_shared_clean,
1826 smaps_shared_dirty,
1827 smaps_private_clean,
1828 smaps_private_dirty,
1829 smaps_swap,
1830 ) = Self::smaps_rollup_mb()
1831 .map(
1832 |(
1833 rss,
1834 anon,
1835 file_est,
1836 shared_clean,
1837 shared_dirty,
1838 private_clean,
1839 private_dirty,
1840 swap,
1841 )| {
1842 (
1843 format!("{rss:.1}"),
1844 format!("{anon:.1}"),
1845 format!("{file_est:.1}"),
1846 format!("{shared_clean:.1}"),
1847 format!("{shared_dirty:.1}"),
1848 format!("{private_clean:.1}"),
1849 format!("{private_dirty:.1}"),
1850 format!("{swap:.1}"),
1851 )
1852 },
1853 )
1854 .unwrap_or_else(|| {
1855 (
1856 "N/A".into(),
1857 "N/A".into(),
1858 "N/A".into(),
1859 "N/A".into(),
1860 "N/A".into(),
1861 "N/A".into(),
1862 "N/A".into(),
1863 "N/A".into(),
1864 )
1865 });
1866 log::info!(
1867 "MEMSTATS rss_mb={} vmrss_mb={} vmhwm_mb={} vmdata_mb={} vmswap_mb={} smaps_rss_mb={} smaps_anon_mb={} smaps_file_est_mb={} smaps_shared_clean_mb={} smaps_shared_dirty_mb={} smaps_private_clean_mb={} smaps_private_dirty_mb={} smaps_swap_mb={} known_dest={} known_dest_cap_evict={} path={} path_cap_evict={} announce={} reverse={} link={} held_ann={} hashlist={} sig_cache={} ann_verify_q={} rate_lim={} blackhole={} tunnel={} ann_q_ifaces={} ann_q_nonempty={} ann_q_entries={} ann_q_bytes={} ann_q_iface_drop={} pr_tags={} disc_pr={} sent_pkt={} completed={} local_dest={} shared_ann={} lm_links={} hp_sessions={} proof_strat={}",
1868 rss,
1869 vm_rss,
1870 vm_hwm,
1871 vm_data,
1872 vm_swap,
1873 smaps_rss,
1874 smaps_anon,
1875 smaps_file_est,
1876 smaps_shared_clean,
1877 smaps_shared_dirty,
1878 smaps_private_clean,
1879 smaps_private_dirty,
1880 smaps_swap,
1881 self.known_destinations.len(),
1882 self.known_destinations_cap_evict_count,
1883 self.engine.path_table_count(),
1884 self.engine.path_destination_cap_evict_count(),
1885 self.engine.announce_table_count(),
1886 self.engine.reverse_table_count(),
1887 self.engine.link_table_count(),
1888 self.engine.held_announces_count(),
1889 self.engine.packet_hashlist_len(),
1890 self.engine.announce_sig_cache_len(),
1891 self.announce_verify_queue
1892 .lock()
1893 .map(|queue| queue.len())
1894 .unwrap_or(0),
1895 self.engine.rate_limiter_count(),
1896 self.engine.blackholed_count(),
1897 self.engine.tunnel_count(),
1898 self.engine.announce_queue_count(),
1899 self.engine.nonempty_announce_queue_count(),
1900 self.engine.queued_announce_count(),
1901 self.engine.queued_announce_bytes(),
1902 self.engine.announce_queue_interface_cap_drop_count(),
1903 self.engine.discovery_pr_tags_count(),
1904 self.engine.discovery_path_requests_count(),
1905 self.sent_packets.len(),
1906 self.completed_proofs.len(),
1907 self.local_destinations.len(),
1908 self.shared_announces.len(),
1909 self.link_manager.link_count(),
1910 self.holepunch_manager.session_count(),
1911 self.proof_strategies.len(),
1912 );
1913 }
1914
1915 pub(crate) fn tick_management_announces(&mut self, now: f64) {
1917 if self.transport_identity.is_none() {
1918 return;
1919 }
1920
1921 let uptime = now - self.started;
1922
1923 if !self.initial_announce_sent {
1925 if uptime < Self::MANAGEMENT_ANNOUNCE_DELAY {
1926 return;
1927 }
1928 self.initial_announce_sent = true;
1929 self.emit_management_announces(now);
1930 return;
1931 }
1932
1933 if now - self.last_management_announce >= self.management_announce_interval_secs {
1935 self.emit_management_announces(now);
1936 }
1937 }
1938
1939 pub(crate) fn emit_management_announces(&mut self, now: f64) {
1941 use crate::management;
1942
1943 self.last_management_announce = now;
1944
1945 let identity = match self.transport_identity {
1946 Some(ref id) => id,
1947 None => return,
1948 };
1949
1950 let mgmt_raw = if self.management_config.enable_remote_management {
1952 management::build_management_announce(identity, &mut self.rng)
1953 } else {
1954 None
1955 };
1956
1957 let bh_raw = if self.management_config.publish_blackhole {
1958 management::build_blackhole_announce(identity, &mut self.rng)
1959 } else {
1960 None
1961 };
1962
1963 let probe_raw = if self.probe_responder_hash.is_some() {
1964 management::build_probe_announce(identity, &mut self.rng)
1965 } else {
1966 None
1967 };
1968
1969 if let Some(raw) = mgmt_raw {
1970 if let Ok(packet) = RawPacket::unpack(&raw) {
1971 let actions = self.engine.handle_outbound(
1972 &packet,
1973 rns_core::constants::DESTINATION_SINGLE,
1974 None,
1975 now,
1976 );
1977 self.dispatch_all(actions);
1978 log::debug!("Emitted management destination announce");
1979 }
1980 }
1981
1982 if let Some(raw) = bh_raw {
1983 if let Ok(packet) = RawPacket::unpack(&raw) {
1984 let actions = self.engine.handle_outbound(
1985 &packet,
1986 rns_core::constants::DESTINATION_SINGLE,
1987 None,
1988 now,
1989 );
1990 self.dispatch_all(actions);
1991 log::debug!("Emitted blackhole info announce");
1992 }
1993 }
1994
1995 if let Some(raw) = probe_raw {
1996 if let Ok(packet) = RawPacket::unpack(&raw) {
1997 let actions = self.engine.handle_outbound(
1998 &packet,
1999 rns_core::constants::DESTINATION_SINGLE,
2000 None,
2001 now,
2002 );
2003 self.dispatch_all(actions);
2004 log::debug!("Emitted probe responder announce");
2005 }
2006 }
2007 }
2008
2009 pub(crate) fn handle_management_request(
2011 &mut self,
2012 link_id: [u8; 16],
2013 path_hash: [u8; 16],
2014 data: Vec<u8>,
2015 request_id: [u8; 16],
2016 remote_identity: Option<([u8; 16], [u8; 64])>,
2017 ) {
2018 use crate::management;
2019
2020 let is_restricted = path_hash == management::status_path_hash()
2022 || path_hash == management::path_path_hash();
2023
2024 if is_restricted && !self.management_config.remote_management_allowed.is_empty() {
2025 match remote_identity {
2026 Some((identity_hash, _)) => {
2027 if !self
2028 .management_config
2029 .remote_management_allowed
2030 .contains(&identity_hash)
2031 {
2032 log::debug!("Management request denied: identity not in allowed list");
2033 return;
2034 }
2035 }
2036 None => {
2037 log::debug!("Management request denied: peer not identified");
2038 return;
2039 }
2040 }
2041 }
2042
2043 let response_data = if path_hash == management::status_path_hash() {
2044 {
2045 let views: Vec<&dyn management::InterfaceStatusView> = self
2046 .interfaces
2047 .values()
2048 .map(|e| e as &dyn management::InterfaceStatusView)
2049 .collect();
2050 management::handle_status_request(
2051 &data,
2052 &self.engine,
2053 &views,
2054 self.started,
2055 self.probe_responder_hash,
2056 )
2057 }
2058 } else if path_hash == management::path_path_hash() {
2059 management::handle_path_request(&data, &self.engine)
2060 } else if path_hash == management::list_path_hash() {
2061 management::handle_blackhole_list_request(&self.engine)
2062 } else {
2063 log::warn!("Unknown management path_hash: {:02x?}", &path_hash[..4]);
2064 None
2065 };
2066
2067 if let Some(response) = response_data {
2068 let actions = self.link_manager.send_management_response(
2069 &link_id,
2070 &request_id,
2071 &response,
2072 &mut self.rng,
2073 );
2074 self.dispatch_link_actions(actions);
2075 }
2076 }
2077}