Skip to main content

rns_net/driver/
events.rs

1use super::*;
2use rns_core::transport::{InboundFrame, RxMetadata};
3
4impl Driver {
5    pub(crate) fn handle_frame_event(
6        &mut self,
7        interface_id: InterfaceId,
8        data: Vec<u8>,
9        rssi: Option<i16>,
10        snr: Option<f32>,
11    ) {
12        if data.len() > 2 && (data[0] & 0x03) == 0x01 {
13            log::debug!(
14                "Announce:frame from iface {} (len={}, flags=0x{:02x})",
15                interface_id.0,
16                data.len(),
17                data[0]
18            );
19        }
20        if let Some(entry) = self.interfaces.get(&interface_id) {
21            if !entry.enabled || !entry.online {
22                return;
23            }
24        }
25        if let Some(entry) = self.interfaces.get_mut(&interface_id) {
26            entry.stats.rxb += data.len() as u64;
27            entry.stats.rx_packets += 1;
28        }
29
30        let packet = if let Some(entry) = self.interfaces.get(&interface_id) {
31            if let Some(ref ifac_state) = entry.ifac {
32                match ifac::unmask_inbound(&data, ifac_state) {
33                    Some(unmasked) => unmasked,
34                    None => {
35                        log::debug!("[{}] IFAC rejected packet", interface_id.0);
36                        return;
37                    }
38                }
39            } else {
40                if data.len() > 2 && data[0] & 0x80 == 0x80 {
41                    log::debug!(
42                        "[{}] dropping packet with IFAC flag on non-IFAC interface",
43                        interface_id.0
44                    );
45                    return;
46                }
47                data
48            }
49        } else {
50            data
51        };
52
53        #[cfg(feature = "hooks")]
54        {
55            let pkt_ctx = rns_hooks::PacketContext {
56                flags: if packet.is_empty() { 0 } else { packet[0] },
57                hops: if packet.len() > 1 { packet[1] } else { 0 },
58                destination_hash: extract_dest_hash(&packet),
59                context: 0,
60                packet_hash: [0; 32],
61                interface_id: interface_id.0,
62                data_offset: 0,
63                data_len: packet.len() as u32,
64            };
65            let ctx = HookContext::Packet {
66                ctx: &pkt_ctx,
67                raw: &packet,
68            };
69            let now = time::now();
70            let engine_ref = EngineRef {
71                engine: &self.engine,
72                interfaces: &self.interfaces,
73                link_manager: &self.link_manager,
74                now,
75            };
76            let provider_events_enabled = self.provider_events_enabled();
77            if let Some(ref e) = run_hook_inner(
78                &mut self.hook_slots[HookPoint::PreIngress as usize].programs,
79                &self.hook_manager,
80                &engine_ref,
81                &ctx,
82                now,
83                provider_events_enabled,
84            ) {
85                self.forward_hook_side_effects("PreIngress", e);
86                if e.hook_result.as_ref().is_some_and(|r| r.is_drop()) {
87                    return;
88                }
89            }
90        }
91
92        if packet.len() > 2 && (packet[0] & 0x03) == 0x01 {
93            let now = time::now();
94            if let Some(entry) = self.interfaces.get_mut(&interface_id) {
95                entry.stats.record_incoming_announce(now);
96            }
97        }
98
99        if let Some(entry) = self.interfaces.get(&interface_id) {
100            self.engine.update_interface_freqs(
101                interface_id,
102                entry.stats.incoming_announce_freq(),
103                entry.stats.incoming_path_request_freq(),
104                entry.stats.outgoing_path_request_freq(),
105                entry.stats.outgoing_path_request_samples(),
106            );
107        }
108
109        let inbound_frame = InboundFrame {
110            raw: &packet,
111            iface: interface_id,
112            now: time::now(),
113            rx: RxMetadata { rssi, snr },
114        };
115
116        let actions = if self.async_announce_verification {
117            let mut announce_queue = self
118                .announce_verify_queue
119                .lock()
120                .unwrap_or_else(|poisoned| poisoned.into_inner());
121            self.engine.handle_inbound_with_announce_queue(
122                inbound_frame,
123                &mut self.rng,
124                Some(&mut announce_queue),
125            )
126        } else {
127            self.engine.handle_inbound(inbound_frame, &mut self.rng)
128        };
129
130        #[cfg(feature = "hooks")]
131        {
132            let pkt_ctx = rns_hooks::PacketContext {
133                flags: if packet.is_empty() { 0 } else { packet[0] },
134                hops: if packet.len() > 1 { packet[1] } else { 0 },
135                destination_hash: extract_dest_hash(&packet),
136                context: 0,
137                packet_hash: [0; 32],
138                interface_id: interface_id.0,
139                data_offset: 0,
140                data_len: packet.len() as u32,
141            };
142            let ctx = HookContext::Packet {
143                ctx: &pkt_ctx,
144                raw: &packet,
145            };
146            let now = time::now();
147            let engine_ref = EngineRef {
148                engine: &self.engine,
149                interfaces: &self.interfaces,
150                link_manager: &self.link_manager,
151                now,
152            };
153            let provider_events_enabled = self.provider_events_enabled();
154            if let Some(ref e) = run_hook_inner(
155                &mut self.hook_slots[HookPoint::PreDispatch as usize].programs,
156                &self.hook_manager,
157                &engine_ref,
158                &ctx,
159                now,
160                provider_events_enabled,
161            ) {
162                self.forward_hook_side_effects("PreDispatch", e);
163            }
164        }
165
166        self.dispatch_all(actions);
167    }
168
169    pub(crate) fn handle_announce_verified_event(
170        &mut self,
171        key: rns_core::transport::announce_verify_queue::AnnounceVerifyKey,
172        validated: rns_core::announce::ValidatedAnnounce,
173        sig_cache_key: [u8; 32],
174    ) {
175        let pending = {
176            let mut announce_queue = self
177                .announce_verify_queue
178                .lock()
179                .unwrap_or_else(|poisoned| poisoned.into_inner());
180            announce_queue.complete_success(&key)
181        };
182        if let Some(pending) = pending {
183            let actions = self.engine.complete_verified_announce(
184                pending,
185                validated,
186                sig_cache_key,
187                time::now(),
188                &mut self.rng,
189            );
190            self.dispatch_all(actions);
191        }
192    }
193
194    pub(crate) fn handle_tick_event(&mut self) {
195        #[cfg(feature = "hooks")]
196        {
197            let ctx = HookContext::Tick;
198            let now = time::now();
199            let engine_ref = EngineRef {
200                engine: &self.engine,
201                interfaces: &self.interfaces,
202                link_manager: &self.link_manager,
203                now,
204            };
205            let provider_events_enabled = self.provider_events_enabled();
206            if let Some(ref e) = run_hook_inner(
207                &mut self.hook_slots[HookPoint::Tick as usize].programs,
208                &self.hook_manager,
209                &engine_ref,
210                &ctx,
211                now,
212                provider_events_enabled,
213            ) {
214                self.forward_hook_side_effects("Tick", e);
215            }
216        }
217
218        let now = time::now();
219        for (id, entry) in &self.interfaces {
220            self.engine.update_interface_freqs(
221                *id,
222                entry.stats.incoming_announce_freq(),
223                entry.stats.incoming_path_request_freq(),
224                entry.stats.outgoing_path_request_freq(),
225                entry.stats.outgoing_path_request_samples(),
226            );
227        }
228        let actions = self.engine.tick(now, &mut self.rng);
229        self.dispatch_all(actions);
230        let link_actions = self.link_manager.tick(&mut self.rng);
231        self.dispatch_link_actions(link_actions);
232        self.enforce_drain_deadline();
233        {
234            let tx = self.get_event_sender();
235            let hp_actions = self.holepunch_manager.tick(&tx);
236            self.dispatch_holepunch_actions(hp_actions);
237        }
238        self.tick_management_announces(now);
239        self.sent_packets
240            .retain(|_, (_, sent_time)| now - *sent_time < 60.0);
241        self.completed_proofs
242            .retain(|_, (_, received)| now - *received < 120.0);
243
244        self.tick_discovery_announcer(now);
245        #[cfg(feature = "iface-backbone")]
246        self.maintain_backbone_peer_pool();
247
248        self.memory_stats_counter += 1;
249        if self.memory_stats_counter >= 300 {
250            self.memory_stats_counter = 0;
251            self.log_memory_stats();
252        }
253
254        if self.discover_interfaces {
255            self.discovery_cleanup_counter += 1;
256            if self.discovery_cleanup_counter >= self.discovery_cleanup_interval_ticks {
257                self.discovery_cleanup_counter = 0;
258                if let Ok(removed) = self.discovered_interfaces.cleanup() {
259                    if removed > 0 {
260                        log::info!("Discovery cleanup: removed {} stale entries", removed);
261                    }
262                    #[cfg(feature = "iface-backbone")]
263                    self.cull_stale_discovered_backbone_peer_pool_candidates();
264                }
265            }
266        }
267
268        self.cache_cleanup_counter += 1;
269        if self.cache_cleanup_counter >= self.known_destinations_cleanup_interval_ticks {
270            self.cache_cleanup_counter = 0;
271
272            let active_dests = self.engine.active_destination_hashes();
273            let ttl = self.known_destinations_ttl;
274            let kd_before = self.known_destinations.len();
275            self.known_destinations.retain(|k, state| {
276                active_dests.contains(k)
277                    || self.local_destinations.contains_key(k)
278                    || state.retained
279                    || now - Self::known_destination_relevance_time(state) < ttl
280            });
281            let kd_removed = kd_before - self.known_destinations.len();
282            let kd_evicted = self.enforce_known_destination_cap(false);
283            let rl_removed =
284                self.engine
285                    .cull_rate_limiter(&active_dests, now, self.rate_limiter_ttl_secs);
286
287            if kd_removed > 0 || kd_evicted > 0 || rl_removed > 0 {
288                log::info!(
289                    "Memory cleanup: removed {} known_destinations, evicted {} known_destinations, {} rate_limiter entries",
290                    kd_removed, kd_evicted, rl_removed
291                );
292            }
293        }
294
295        self.announce_cache_cleanup_counter += 1;
296        if self.announce_cache_cleanup_counter >= self.announce_cache_cleanup_interval_ticks {
297            self.announce_cache_cleanup_counter = 0;
298            if self.announce_cache.is_some() && self.cache_cleanup_active_hashes.is_none() {
299                self.cache_cleanup_active_hashes = Some(self.engine.active_packet_hashes());
300                self.cache_cleanup_entries = None;
301                self.cache_cleanup_removed = 0;
302            }
303        }
304
305        if self.cache_cleanup_active_hashes.is_some() {
306            if let Some(ref cache) = self.announce_cache {
307                if self.cache_cleanup_entries.is_none() {
308                    match cache.entries() {
309                        Ok(entries) => self.cache_cleanup_entries = Some(entries),
310                        Err(e) => {
311                            log::warn!("Announce cache cleanup failed to open directory: {}", e);
312                            self.cache_cleanup_active_hashes = None;
313                            self.cache_cleanup_entries = None;
314                        }
315                    }
316                }
317            }
318
319            if let Some(ref cache) = self.announce_cache {
320                let Some(active_hashes) = self.cache_cleanup_active_hashes.as_ref() else {
321                    self.cache_cleanup_entries = None;
322                    return;
323                };
324                let entries = match self.cache_cleanup_entries.as_mut() {
325                    Some(entries) => entries,
326                    None => return,
327                };
328                match cache.clean_batch(
329                    active_hashes,
330                    entries,
331                    self.announce_cache_cleanup_batch_size,
332                ) {
333                    Ok((removed, finished)) => {
334                        self.cache_cleanup_removed += removed;
335                        if finished {
336                            if self.cache_cleanup_removed > 0 {
337                                log::info!(
338                                    "Announce cache cleanup complete: removed {} stale files",
339                                    self.cache_cleanup_removed
340                                );
341                            }
342                            self.cache_cleanup_active_hashes = None;
343                            self.cache_cleanup_entries = None;
344                        }
345                    }
346                    Err(e) => {
347                        log::warn!("Announce cache cleanup failed: {}", e);
348                        self.cache_cleanup_active_hashes = None;
349                        self.cache_cleanup_entries = None;
350                    }
351                }
352            } else {
353                self.cache_cleanup_active_hashes = None;
354                self.cache_cleanup_entries = None;
355            }
356        }
357    }
358
359    pub(crate) fn handle_interface_up_event(
360        &mut self,
361        id: InterfaceId,
362        new_writer: Option<Box<dyn crate::interface::Writer>>,
363        info: Option<rns_core::transport::types::InterfaceInfo>,
364    ) {
365        let wants_tunnel;
366        let mut replay_shared_announces = false;
367        if let Some(mut info) = info {
368            log::info!("[{}] dynamic interface registered", id.0);
369            self.apply_announce_rate_defaults(&mut info);
370            self.apply_ingress_control_defaults(&mut info);
371            wants_tunnel = info.wants_tunnel;
372            let iface_type = infer_interface_type(&info.name);
373            info.started = time::now();
374            self.register_interface_runtime_defaults(&info);
375            self.engine.register_interface(info.clone());
376            if let Some(writer) = new_writer {
377                let (writer, async_writer_metrics) =
378                    self.wrap_interface_writer(id, &info.name, writer);
379                self.interfaces.insert(
380                    id,
381                    InterfaceEntry {
382                        id,
383                        info,
384                        writer,
385                        async_writer_metrics: Some(async_writer_metrics),
386                        enabled: true,
387                        online: true,
388                        dynamic: true,
389                        ifac: None,
390                        stats: InterfaceStats {
391                            started: time::now(),
392                            ..Default::default()
393                        },
394                        interface_type: iface_type,
395                        send_retry_at: None,
396                        send_retry_backoff: Duration::ZERO,
397                    },
398                );
399            }
400            self.callbacks.on_interface_up(id);
401            #[cfg(feature = "hooks")]
402            {
403                let ctx = HookContext::Interface { interface_id: id.0 };
404                let now = time::now();
405                let engine_ref = EngineRef {
406                    engine: &self.engine,
407                    interfaces: &self.interfaces,
408                    link_manager: &self.link_manager,
409                    now,
410                };
411                let provider_events_enabled = self.provider_events_enabled();
412                if let Some(ref e) = run_hook_inner(
413                    &mut self.hook_slots[HookPoint::InterfaceUp as usize].programs,
414                    &self.hook_manager,
415                    &engine_ref,
416                    &ctx,
417                    now,
418                    provider_events_enabled,
419                ) {
420                    self.forward_hook_side_effects("InterfaceUp", e);
421                }
422            }
423        } else {
424            let is_local_client = self
425                .interfaces
426                .get(&id)
427                .map(|entry| entry.info.is_local_client)
428                .unwrap_or(false);
429            replay_shared_announces =
430                is_local_client && self.shared_reconnect_pending.remove(&id).unwrap_or(false);
431            let interface_name = self
432                .interfaces
433                .get(&id)
434                .map(|entry| entry.info.name.clone())
435                .unwrap_or_else(|| format!("iface-{}", id.0));
436            let wrapped_writer =
437                new_writer.map(|writer| self.wrap_interface_writer(id, &interface_name, writer));
438            if let Some(entry) = self.interfaces.get_mut(&id) {
439                log::info!("[{}] interface online", id.0);
440                wants_tunnel = entry.info.wants_tunnel;
441                entry.online = true;
442                if let Some((writer, async_writer_metrics)) = wrapped_writer {
443                    log::info!("[{}] writer refreshed after reconnect", id.0);
444                    entry.writer = writer;
445                    entry.async_writer_metrics = Some(async_writer_metrics);
446                }
447                self.callbacks.on_interface_up(id);
448                #[cfg(feature = "hooks")]
449                {
450                    let ctx = HookContext::Interface { interface_id: id.0 };
451                    let now = time::now();
452                    let engine_ref = EngineRef {
453                        engine: &self.engine,
454                        interfaces: &self.interfaces,
455                        link_manager: &self.link_manager,
456                        now,
457                    };
458                    let provider_events_enabled = self.provider_events_enabled();
459                    if let Some(ref e) = run_hook_inner(
460                        &mut self.hook_slots[HookPoint::InterfaceUp as usize].programs,
461                        &self.hook_manager,
462                        &engine_ref,
463                        &ctx,
464                        now,
465                        provider_events_enabled,
466                    ) {
467                        self.forward_hook_side_effects("InterfaceUp", e);
468                    }
469                }
470            } else {
471                wants_tunnel = false;
472            }
473        }
474
475        if wants_tunnel {
476            self.synthesize_tunnel_for_interface(id);
477        }
478        if replay_shared_announces {
479            self.replay_shared_announces();
480        }
481    }
482
483    pub(crate) fn handle_interface_down_event(&mut self, id: InterfaceId) {
484        if let Some(entry) = self.interfaces.get(&id) {
485            if let Some(tunnel_id) = entry.info.tunnel_id {
486                self.engine.void_tunnel_interface(&tunnel_id);
487            }
488        }
489
490        if let Some(entry) = self.interfaces.get(&id) {
491            let is_dynamic = entry.dynamic;
492            let is_local_client = entry.info.is_local_client;
493            let interface_name = entry.info.name.clone();
494            if is_dynamic {
495                log::info!("[{}] dynamic interface removed", id.0);
496                self.interface_runtime_defaults.remove(&interface_name);
497                self.engine.deregister_interface(id);
498                self.interfaces.remove(&id);
499            } else {
500                log::info!("[{}] interface offline", id.0);
501                if let Some(entry) = self.interfaces.get_mut(&id) {
502                    entry.online = false;
503                } else {
504                    log::warn!(
505                        "interface {} disappeared while handling interface-down",
506                        id.0
507                    );
508                    return;
509                }
510                if is_local_client {
511                    self.handle_shared_interface_down(id);
512                }
513            }
514            self.callbacks.on_interface_down(id);
515            #[cfg(feature = "hooks")]
516            {
517                let ctx = HookContext::Interface { interface_id: id.0 };
518                let now = time::now();
519                let engine_ref = EngineRef {
520                    engine: &self.engine,
521                    interfaces: &self.interfaces,
522                    link_manager: &self.link_manager,
523                    now,
524                };
525                let provider_events_enabled = self.provider_events_enabled();
526                if let Some(ref e) = run_hook_inner(
527                    &mut self.hook_slots[HookPoint::InterfaceDown as usize].programs,
528                    &self.hook_manager,
529                    &engine_ref,
530                    &ctx,
531                    now,
532                    provider_events_enabled,
533                ) {
534                    self.forward_hook_side_effects("InterfaceDown", e);
535                }
536            }
537        }
538        #[cfg(feature = "iface-backbone")]
539        self.handle_backbone_peer_pool_down(id);
540    }
541
542    pub(crate) fn known_destination_route_hint(
543        &self,
544        dest_hash: &[u8; 16],
545    ) -> Option<(InterfaceId, u8)> {
546        let announced = &self.known_destinations.get(dest_hash)?.announced;
547        let iface = announced.receiving_interface;
548        if iface.0 == 0 {
549            return None;
550        }
551
552        self.interfaces
553            .get(&iface)
554            .filter(|entry| entry.online)
555            .map(|_| (iface, announced.hops))
556    }
557
558    pub(crate) fn handle_send_outbound_event(
559        &mut self,
560        raw: Vec<u8>,
561        dest_type: u8,
562        attached_interface: Option<InterfaceId>,
563    ) {
564        if self.is_draining() {
565            self.reject_new_work("send outbound packet");
566            return;
567        }
568        match RawPacket::unpack(&raw) {
569            Ok(packet) => {
570                let is_announce =
571                    packet.flags.packet_type == rns_core::constants::PACKET_TYPE_ANNOUNCE;
572                if is_announce {
573                    log::debug!(
574                        "SendOutbound: ANNOUNCE for {:02x?} (len={}, dest_type={}, attached={:?})",
575                        &packet.destination_hash[..4],
576                        raw.len(),
577                        dest_type,
578                        attached_interface
579                    );
580                }
581                if packet.flags.packet_type == rns_core::constants::PACKET_TYPE_DATA {
582                    self.sent_packets
583                        .insert(packet.packet_hash, (packet.destination_hash, time::now()));
584                }
585                let actions = self.engine.handle_outbound(
586                    &packet,
587                    dest_type,
588                    attached_interface,
589                    time::now(),
590                );
591                if is_announce {
592                    log::debug!(
593                        "SendOutbound: announce routed to {} actions: {:?}",
594                        actions.len(),
595                        actions
596                            .iter()
597                            .map(|a| match a {
598                                TransportAction::SendOnInterface { interface, .. } =>
599                                    format!("SendOn({})", interface.0),
600                                TransportAction::BroadcastOnAllInterfaces { .. } =>
601                                    "BroadcastAll".to_string(),
602                                _ => "other".to_string(),
603                            })
604                            .collect::<Vec<_>>()
605                    );
606                }
607                self.dispatch_all(actions);
608            }
609            Err(e) => {
610                log::warn!("SendOutbound: failed to unpack packet: {:?}", e);
611            }
612        }
613    }
614
615    /// Run the event loop. Blocks until Shutdown or all senders are dropped.
616    pub fn run(&mut self) {
617        loop {
618            let event = match self.rx.recv() {
619                Ok(e) => e,
620                Err(_) => break, // all senders dropped
621            };
622
623            match event {
624                Event::Frame {
625                    interface_id,
626                    data,
627                    rssi,
628                    snr,
629                } => {
630                    self.handle_frame_event(interface_id, data, rssi, snr);
631                }
632                Event::AnnounceVerified {
633                    key,
634                    validated,
635                    sig_cache_key,
636                } => {
637                    self.handle_announce_verified_event(key, validated, sig_cache_key);
638                }
639                Event::AnnounceVerifyFailed { key, .. } => {
640                    let mut announce_queue = self
641                        .announce_verify_queue
642                        .lock()
643                        .unwrap_or_else(|poisoned| poisoned.into_inner());
644                    let _ = announce_queue.complete_failure(&key);
645                }
646                Event::Tick => self.handle_tick_event(),
647                Event::BeginDrain { timeout } => {
648                    self.begin_drain(timeout);
649                }
650                Event::InterfaceUp(id, new_writer, info) => {
651                    self.handle_interface_up_event(id, new_writer, info);
652                }
653                Event::InterfaceDown(id) => self.handle_interface_down_event(id),
654                Event::SendOutbound {
655                    raw,
656                    dest_type,
657                    attached_interface,
658                } => self.handle_send_outbound_event(raw, dest_type, attached_interface),
659                Event::RegisterDestination {
660                    dest_hash,
661                    dest_type,
662                } => {
663                    self.engine.register_destination(dest_hash, dest_type);
664                    self.local_destinations.insert(dest_hash, dest_type);
665                }
666                Event::StoreSharedAnnounce {
667                    dest_hash,
668                    name_hash,
669                    identity_prv_key,
670                    app_data,
671                } => {
672                    self.shared_announces.insert(
673                        dest_hash,
674                        SharedAnnounceRecord {
675                            name_hash,
676                            identity_prv_key,
677                            app_data,
678                        },
679                    );
680                }
681                Event::DeregisterDestination { dest_hash } => {
682                    self.engine.deregister_destination(&dest_hash);
683                    self.local_destinations.remove(&dest_hash);
684                    self.shared_announces.remove(&dest_hash);
685                }
686                Event::Query(request, response_tx) => {
687                    let response = self.handle_query_mut(request);
688                    let _ = response_tx.send(response);
689                }
690                Event::DeregisterLinkDestination { dest_hash } => {
691                    self.link_manager.deregister_link_destination(&dest_hash);
692                }
693                Event::RegisterLinkDestination {
694                    dest_hash,
695                    sig_prv_bytes,
696                    sig_pub_bytes,
697                    resource_strategy,
698                } => {
699                    let sig_prv =
700                        rns_crypto::ed25519::Ed25519PrivateKey::from_bytes(&sig_prv_bytes);
701                    let strat = match resource_strategy {
702                        1 => crate::link_manager::ResourceStrategy::AcceptAll,
703                        2 => crate::link_manager::ResourceStrategy::AcceptApp,
704                        _ => crate::link_manager::ResourceStrategy::AcceptNone,
705                    };
706                    self.link_manager.register_link_destination(
707                        dest_hash,
708                        sig_prv,
709                        sig_pub_bytes,
710                        strat,
711                    );
712                    // Also register in transport engine so inbound packets are delivered locally
713                    self.engine
714                        .register_destination(dest_hash, rns_core::constants::DESTINATION_SINGLE);
715                    self.local_destinations
716                        .insert(dest_hash, rns_core::constants::DESTINATION_SINGLE);
717                }
718                Event::RegisterRequestHandler {
719                    path,
720                    allowed_list,
721                    handler,
722                } => {
723                    self.link_manager.register_request_handler(
724                        &path,
725                        allowed_list,
726                        move |link_id, p, data, remote| handler(link_id, p, data, remote),
727                    );
728                }
729                Event::RegisterRequestHandlerResponse {
730                    path,
731                    allowed_list,
732                    handler,
733                } => {
734                    self.link_manager.register_request_handler_response(
735                        &path,
736                        allowed_list,
737                        move |link_id, p, data, remote| handler(link_id, p, data, remote),
738                    );
739                }
740                Event::CreateLink {
741                    dest_hash,
742                    dest_sig_pub_bytes,
743                    response_tx,
744                } => {
745                    if self.is_draining() {
746                        self.reject_new_work("create link");
747                        let _ = (dest_hash, dest_sig_pub_bytes);
748                        let _ = response_tx.send([0u8; 16]);
749                        continue;
750                    }
751                    let next_hop_interface = self.engine.next_hop_interface(&dest_hash);
752                    let recalled_route_hint = if next_hop_interface.is_none() {
753                        self.known_destination_route_hint(&dest_hash)
754                    } else {
755                        None
756                    };
757                    if recalled_route_hint.is_some() {
758                        let _ = self.mark_known_destination_used(&dest_hash);
759                    }
760                    let attached_interface =
761                        next_hop_interface.or(recalled_route_hint.map(|(iface, _)| iface));
762                    let hops = self
763                        .engine
764                        .hops_to(&dest_hash)
765                        .or_else(|| recalled_route_hint.map(|(_, hops)| hops))
766                        .unwrap_or(0);
767                    let mtu = attached_interface
768                        .and_then(|iface_id| self.interfaces.get(&iface_id))
769                        .map(|entry| entry.info.mtu)
770                        .unwrap_or(rns_core::constants::MTU as u32);
771                    let (link_id, mut link_actions) = self.link_manager.create_link(
772                        &dest_hash,
773                        &dest_sig_pub_bytes,
774                        hops,
775                        mtu,
776                        &mut self.rng,
777                    );
778                    if let Some(iface) = attached_interface {
779                        self.link_manager.set_link_route_hint(&link_id, iface, None);
780                    }
781                    if next_hop_interface.is_none() {
782                        if let Some(iface) = attached_interface {
783                            for action in &mut link_actions {
784                                if let LinkManagerAction::SendPacket {
785                                    dest_type,
786                                    attached_interface,
787                                    ..
788                                } = action
789                                {
790                                    if *dest_type == rns_core::constants::DESTINATION_LINK
791                                        && attached_interface.is_none()
792                                    {
793                                        *attached_interface = Some(iface);
794                                    }
795                                }
796                            }
797                        }
798                    }
799                    let _ = response_tx.send(link_id);
800                    self.dispatch_link_actions(link_actions);
801                }
802                Event::SendRequest {
803                    link_id,
804                    path,
805                    data,
806                } => {
807                    if self.is_draining() {
808                        self.reject_new_work("send link request");
809                        let _ = (link_id, path, data);
810                        continue;
811                    }
812                    let link_actions =
813                        self.link_manager
814                            .send_request(&link_id, &path, &data, &mut self.rng);
815                    self.dispatch_link_actions(link_actions);
816                }
817                Event::IdentifyOnLink {
818                    link_id,
819                    identity_prv_key,
820                } => {
821                    if self.is_draining() {
822                        self.reject_new_work("identify on link");
823                        let _ = (link_id, identity_prv_key);
824                        continue;
825                    }
826                    let identity =
827                        rns_crypto::identity::Identity::from_private_key(&identity_prv_key);
828                    let link_actions =
829                        self.link_manager
830                            .identify(&link_id, &identity, &mut self.rng);
831                    self.dispatch_link_actions(link_actions);
832                }
833                Event::TeardownLink { link_id } => {
834                    let link_actions = self.link_manager.teardown_link(&link_id);
835                    self.dispatch_link_actions(link_actions);
836                }
837                Event::SendResource {
838                    link_id,
839                    data,
840                    metadata,
841                    auto_compress,
842                } => {
843                    if self.is_draining() {
844                        self.reject_new_work("send resource");
845                        let _ = (link_id, data, metadata, auto_compress);
846                        continue;
847                    }
848                    let link_actions = self.link_manager.send_resource_with_auto_compress(
849                        &link_id,
850                        &data,
851                        metadata.as_deref(),
852                        auto_compress,
853                        &mut self.rng,
854                    );
855                    self.dispatch_link_actions(link_actions);
856                }
857                Event::SetResourceStrategy { link_id, strategy } => {
858                    use crate::link_manager::ResourceStrategy;
859                    let strat = match strategy {
860                        0 => ResourceStrategy::AcceptNone,
861                        1 => ResourceStrategy::AcceptAll,
862                        2 => ResourceStrategy::AcceptApp,
863                        _ => ResourceStrategy::AcceptNone,
864                    };
865                    self.link_manager.set_resource_strategy(&link_id, strat);
866                }
867                Event::AcceptResource {
868                    link_id,
869                    resource_hash,
870                    accept,
871                } => {
872                    if self.is_draining() && accept {
873                        self.reject_new_work("accept resource");
874                        let _ = (link_id, resource_hash, accept);
875                        continue;
876                    }
877                    let link_actions = self.link_manager.accept_resource(
878                        &link_id,
879                        &resource_hash,
880                        accept,
881                        &mut self.rng,
882                    );
883                    self.dispatch_link_actions(link_actions);
884                }
885                Event::SendChannelMessage {
886                    link_id,
887                    msgtype,
888                    payload,
889                    response_tx,
890                } => {
891                    if self.is_draining() {
892                        self.reject_new_work("send channel message");
893                        let _ = response_tx.send(Err(self.drain_error("send channel message")));
894                        continue;
895                    }
896                    match self.link_manager.send_channel_message(
897                        &link_id,
898                        msgtype,
899                        &payload,
900                        &mut self.rng,
901                    ) {
902                        Ok(link_actions) => {
903                            self.dispatch_link_actions(link_actions);
904                            let _ = response_tx.send(Ok(()));
905                        }
906                        Err(err) => {
907                            let _ = response_tx.send(Err(err));
908                        }
909                    }
910                }
911                Event::SendOnLink {
912                    link_id,
913                    data,
914                    context,
915                } => {
916                    if self.is_draining() {
917                        self.reject_new_work("send link payload");
918                        let _ = (link_id, data, context);
919                        continue;
920                    }
921                    let link_actions =
922                        self.link_manager
923                            .send_on_link(&link_id, &data, context, &mut self.rng);
924                    self.dispatch_link_actions(link_actions);
925                }
926                Event::RequestPath { dest_hash } => {
927                    if self.is_draining() {
928                        self.reject_new_work("request path");
929                        let _ = dest_hash;
930                        continue;
931                    }
932                    self.handle_request_path(dest_hash);
933                }
934                Event::RegisterProofStrategy {
935                    dest_hash,
936                    strategy,
937                    signing_key,
938                } => {
939                    let identity = signing_key
940                        .map(|key| rns_crypto::identity::Identity::from_private_key(&key));
941                    self.proof_strategies
942                        .insert(dest_hash, (strategy, identity));
943                }
944                Event::ProposeDirectConnect { link_id } => {
945                    if self.is_draining() {
946                        self.reject_new_work("propose direct connect");
947                        let _ = link_id;
948                        continue;
949                    }
950                    let derived_key = self.link_manager.get_derived_key(&link_id);
951                    if let Some(dk) = derived_key {
952                        let tx = self.get_event_sender();
953                        let hp_actions =
954                            self.holepunch_manager
955                                .propose(link_id, &dk, &mut self.rng, &tx);
956                        self.dispatch_holepunch_actions(hp_actions);
957                    } else {
958                        log::warn!(
959                            "Cannot propose direct connect: no derived key for link {:02x?}",
960                            &link_id[..4]
961                        );
962                    }
963                }
964                Event::SetDirectConnectPolicy { policy } => {
965                    self.holepunch_manager.set_policy(policy);
966                }
967                Event::HolePunchProbeResult {
968                    link_id,
969                    session_id,
970                    observed_addr,
971                    socket,
972                    probe_server,
973                } => {
974                    let hp_actions = self.holepunch_manager.handle_probe_result(
975                        link_id,
976                        session_id,
977                        observed_addr,
978                        socket,
979                        probe_server,
980                    );
981                    self.dispatch_holepunch_actions(hp_actions);
982                }
983                Event::HolePunchProbeFailed {
984                    link_id,
985                    session_id,
986                } => {
987                    let hp_actions = self
988                        .holepunch_manager
989                        .handle_probe_failed(link_id, session_id);
990                    self.dispatch_holepunch_actions(hp_actions);
991                }
992                Event::LoadHook {
993                    name,
994                    wasm_bytes,
995                    attach_point,
996                    priority,
997                    response_tx,
998                } => {
999                    #[cfg(feature = "hooks")]
1000                    {
1001                        let result = (|| -> Result<(), String> {
1002                            let point_idx = crate::config::parse_hook_point(&attach_point)
1003                                .ok_or_else(|| format!("unknown hook point '{}'", attach_point))?;
1004                            let mgr = self
1005                                .hook_manager
1006                                .as_ref()
1007                                .ok_or_else(|| "hook manager not available".to_string())?;
1008                            let program = mgr
1009                                .compile(name.clone(), &wasm_bytes, priority)
1010                                .map_err(|e| format!("compile error: {}", e))?;
1011                            self.hook_slots[point_idx].attach(program);
1012                            log::info!(
1013                                "Loaded hook '{}' at point {} (priority {})",
1014                                name,
1015                                attach_point,
1016                                priority
1017                            );
1018                            Ok(())
1019                        })();
1020                        let _ = response_tx.send(result);
1021                    }
1022                    #[cfg(not(feature = "hooks"))]
1023                    {
1024                        let _ = (name, wasm_bytes, attach_point, priority);
1025                        let _ = response_tx.send(Err("hooks not enabled".to_string()));
1026                    }
1027                }
1028                Event::LoadHookFile {
1029                    name,
1030                    path,
1031                    hook_type,
1032                    attach_point,
1033                    priority,
1034                    response_tx,
1035                } => {
1036                    #[cfg(feature = "hooks")]
1037                    {
1038                        let result = (|| -> Result<(), String> {
1039                            let point_idx = crate::config::parse_hook_point(&attach_point)
1040                                .ok_or_else(|| format!("unknown hook point '{}'", attach_point))?;
1041                            let backend = crate::config::parse_hook_backend(&hook_type)?;
1042                            let mgr = self
1043                                .hook_manager
1044                                .as_ref()
1045                                .ok_or_else(|| "hook manager not available".to_string())?;
1046                            let program = mgr
1047                                .load_file_backend(
1048                                    name.clone(),
1049                                    std::path::Path::new(&path),
1050                                    priority,
1051                                    backend,
1052                                )
1053                                .map_err(|e| format!("load error: {}", e))?;
1054                            self.hook_slots[point_idx].attach(program);
1055                            log::info!(
1056                                "Loaded {} hook '{}' at point {} (priority {})",
1057                                backend.as_str(),
1058                                name,
1059                                attach_point,
1060                                priority
1061                            );
1062                            Ok(())
1063                        })();
1064                        let _ = response_tx.send(result);
1065                    }
1066                    #[cfg(not(feature = "hooks"))]
1067                    {
1068                        let _ = (name, path, hook_type, attach_point, priority);
1069                        let _ = response_tx.send(Err("hooks not enabled".to_string()));
1070                    }
1071                }
1072                Event::LoadBuiltinHook {
1073                    name,
1074                    builtin_id,
1075                    attach_point,
1076                    priority,
1077                    response_tx,
1078                } => {
1079                    #[cfg(feature = "hooks")]
1080                    {
1081                        let result = (|| -> Result<(), String> {
1082                            let point_idx = crate::config::parse_hook_point(&attach_point)
1083                                .ok_or_else(|| format!("unknown hook point '{}'", attach_point))?;
1084                            let mgr = self
1085                                .hook_manager
1086                                .as_ref()
1087                                .ok_or_else(|| "hook manager not available".to_string())?;
1088                            let program = mgr
1089                                .load_builtin(name.clone(), builtin_id.as_str(), priority)
1090                                .map_err(|e| format!("load error: {}", e))?;
1091                            self.hook_slots[point_idx].attach(program);
1092                            log::info!(
1093                                "Loaded built-in hook '{}' ({}) at point {} (priority {})",
1094                                name,
1095                                builtin_id,
1096                                attach_point,
1097                                priority
1098                            );
1099                            Ok(())
1100                        })();
1101                        let _ = response_tx.send(result);
1102                    }
1103                    #[cfg(not(feature = "hooks"))]
1104                    {
1105                        let _ = (name, builtin_id, attach_point, priority);
1106                        let _ = response_tx.send(Err("hooks not enabled".to_string()));
1107                    }
1108                }
1109                Event::UnloadHook {
1110                    name,
1111                    attach_point,
1112                    response_tx,
1113                } => {
1114                    #[cfg(feature = "hooks")]
1115                    {
1116                        let result = (|| -> Result<(), String> {
1117                            let point_idx = crate::config::parse_hook_point(&attach_point)
1118                                .ok_or_else(|| format!("unknown hook point '{}'", attach_point))?;
1119                            match self.hook_slots[point_idx].detach(&name) {
1120                                Some(_) => {
1121                                    log::info!(
1122                                        "Unloaded hook '{}' from point {}",
1123                                        name,
1124                                        attach_point
1125                                    );
1126                                    Ok(())
1127                                }
1128                                None => Err(format!(
1129                                    "hook '{}' not found at point '{}'",
1130                                    name, attach_point
1131                                )),
1132                            }
1133                        })();
1134                        let _ = response_tx.send(result);
1135                    }
1136                    #[cfg(not(feature = "hooks"))]
1137                    {
1138                        let _ = (name, attach_point);
1139                        let _ = response_tx.send(Err("hooks not enabled".to_string()));
1140                    }
1141                }
1142                Event::ReloadHook {
1143                    name,
1144                    attach_point,
1145                    wasm_bytes,
1146                    response_tx,
1147                } => {
1148                    #[cfg(feature = "hooks")]
1149                    {
1150                        let result = (|| -> Result<(), String> {
1151                            let point_idx = crate::config::parse_hook_point(&attach_point)
1152                                .ok_or_else(|| format!("unknown hook point '{}'", attach_point))?;
1153                            let old =
1154                                self.hook_slots[point_idx].detach(&name).ok_or_else(|| {
1155                                    format!("hook '{}' not found at point '{}'", name, attach_point)
1156                                })?;
1157                            let priority = old.priority;
1158                            let mgr = match self.hook_manager.as_ref() {
1159                                Some(m) => m,
1160                                None => {
1161                                    self.hook_slots[point_idx].attach(old);
1162                                    return Err("hook manager not available".to_string());
1163                                }
1164                            };
1165                            match mgr.compile(name.clone(), &wasm_bytes, priority) {
1166                                Ok(program) => {
1167                                    self.hook_slots[point_idx].attach(program);
1168                                    log::info!(
1169                                        "Reloaded hook '{}' at point {} (priority {})",
1170                                        name,
1171                                        attach_point,
1172                                        priority
1173                                    );
1174                                    Ok(())
1175                                }
1176                                Err(e) => {
1177                                    self.hook_slots[point_idx].attach(old);
1178                                    Err(format!("compile error: {}", e))
1179                                }
1180                            }
1181                        })();
1182                        let _ = response_tx.send(result);
1183                    }
1184                    #[cfg(not(feature = "hooks"))]
1185                    {
1186                        let _ = (name, attach_point, wasm_bytes);
1187                        let _ = response_tx.send(Err("hooks not enabled".to_string()));
1188                    }
1189                }
1190                Event::ReloadHookFile {
1191                    name,
1192                    attach_point,
1193                    path,
1194                    hook_type,
1195                    response_tx,
1196                } => {
1197                    #[cfg(feature = "hooks")]
1198                    {
1199                        let result = (|| -> Result<(), String> {
1200                            let point_idx = crate::config::parse_hook_point(&attach_point)
1201                                .ok_or_else(|| format!("unknown hook point '{}'", attach_point))?;
1202                            let old =
1203                                self.hook_slots[point_idx].detach(&name).ok_or_else(|| {
1204                                    format!("hook '{}' not found at point '{}'", name, attach_point)
1205                                })?;
1206                            let priority = old.priority;
1207                            let backend = match crate::config::parse_hook_backend(&hook_type) {
1208                                Ok(backend) => backend,
1209                                Err(e) => {
1210                                    self.hook_slots[point_idx].attach(old);
1211                                    return Err(e);
1212                                }
1213                            };
1214                            let mgr = match self.hook_manager.as_ref() {
1215                                Some(m) => m,
1216                                None => {
1217                                    self.hook_slots[point_idx].attach(old);
1218                                    return Err("hook manager not available".to_string());
1219                                }
1220                            };
1221                            match mgr.load_file_backend(
1222                                name.clone(),
1223                                std::path::Path::new(&path),
1224                                priority,
1225                                backend,
1226                            ) {
1227                                Ok(program) => {
1228                                    self.hook_slots[point_idx].attach(program);
1229                                    log::info!(
1230                                        "Reloaded {} hook '{}' at point {} (priority {})",
1231                                        backend.as_str(),
1232                                        name,
1233                                        attach_point,
1234                                        priority
1235                                    );
1236                                    Ok(())
1237                                }
1238                                Err(e) => {
1239                                    self.hook_slots[point_idx].attach(old);
1240                                    Err(format!("load error: {}", e))
1241                                }
1242                            }
1243                        })();
1244                        let _ = response_tx.send(result);
1245                    }
1246                    #[cfg(not(feature = "hooks"))]
1247                    {
1248                        let _ = (name, attach_point, path, hook_type);
1249                        let _ = response_tx.send(Err("hooks not enabled".to_string()));
1250                    }
1251                }
1252                Event::ReloadBuiltinHook {
1253                    name,
1254                    attach_point,
1255                    builtin_id,
1256                    response_tx,
1257                } => {
1258                    #[cfg(feature = "hooks")]
1259                    {
1260                        let result = (|| -> Result<(), String> {
1261                            let point_idx = crate::config::parse_hook_point(&attach_point)
1262                                .ok_or_else(|| format!("unknown hook point '{}'", attach_point))?;
1263                            let old =
1264                                self.hook_slots[point_idx].detach(&name).ok_or_else(|| {
1265                                    format!("hook '{}' not found at point '{}'", name, attach_point)
1266                                })?;
1267                            let priority = old.priority;
1268                            let mgr = match self.hook_manager.as_ref() {
1269                                Some(m) => m,
1270                                None => {
1271                                    self.hook_slots[point_idx].attach(old);
1272                                    return Err("hook manager not available".to_string());
1273                                }
1274                            };
1275                            match mgr.load_builtin(name.clone(), builtin_id.as_str(), priority) {
1276                                Ok(program) => {
1277                                    self.hook_slots[point_idx].attach(program);
1278                                    log::info!(
1279                                        "Reloaded built-in hook '{}' ({}) at point {} (priority {})",
1280                                        name,
1281                                        builtin_id,
1282                                        attach_point,
1283                                        priority
1284                                    );
1285                                    Ok(())
1286                                }
1287                                Err(e) => {
1288                                    self.hook_slots[point_idx].attach(old);
1289                                    Err(format!("load error: {}", e))
1290                                }
1291                            }
1292                        })();
1293                        let _ = response_tx.send(result);
1294                    }
1295                    #[cfg(not(feature = "hooks"))]
1296                    {
1297                        let _ = (name, attach_point, builtin_id);
1298                        let _ = response_tx.send(Err("hooks not enabled".to_string()));
1299                    }
1300                }
1301                Event::SetHookEnabled {
1302                    name,
1303                    attach_point,
1304                    enabled,
1305                    response_tx,
1306                } => {
1307                    #[cfg(feature = "hooks")]
1308                    {
1309                        let result = self.update_hook_program(&name, &attach_point, |program| {
1310                            program.enabled = enabled;
1311                        });
1312                        if result.is_ok() {
1313                            log::info!(
1314                                "{} hook '{}' at point {}",
1315                                if enabled { "Enabled" } else { "Disabled" },
1316                                name,
1317                                attach_point,
1318                            );
1319                        }
1320                        let _ = response_tx.send(result);
1321                    }
1322                    #[cfg(not(feature = "hooks"))]
1323                    {
1324                        let _ = (name, attach_point, enabled);
1325                        let _ = response_tx.send(Err("hooks not enabled".to_string()));
1326                    }
1327                }
1328                Event::SetHookPriority {
1329                    name,
1330                    attach_point,
1331                    priority,
1332                    response_tx,
1333                } => {
1334                    #[cfg(feature = "hooks")]
1335                    {
1336                        let result = self.update_hook_program(&name, &attach_point, |program| {
1337                            program.priority = priority;
1338                        });
1339                        if result.is_ok() {
1340                            if let Some(point_idx) = crate::config::parse_hook_point(&attach_point)
1341                            {
1342                                self.hook_slots[point_idx]
1343                                    .programs
1344                                    .sort_by(|a, b| b.priority.cmp(&a.priority));
1345                                log::info!(
1346                                    "Updated hook '{}' at point {} to priority {}",
1347                                    name,
1348                                    attach_point,
1349                                    priority,
1350                                );
1351                            } else {
1352                                log::error!(
1353                                    "hook point '{}' became invalid during priority update",
1354                                    attach_point
1355                                );
1356                            }
1357                        }
1358                        let _ = response_tx.send(result);
1359                    }
1360                    #[cfg(not(feature = "hooks"))]
1361                    {
1362                        let _ = (name, attach_point, priority);
1363                        let _ = response_tx.send(Err("hooks not enabled".to_string()));
1364                    }
1365                }
1366                Event::ListHooks { response_tx } => {
1367                    #[cfg(feature = "hooks")]
1368                    {
1369                        let hook_point_names = [
1370                            "PreIngress",
1371                            "PreDispatch",
1372                            "AnnounceReceived",
1373                            "PathUpdated",
1374                            "AnnounceRetransmit",
1375                            "LinkRequestReceived",
1376                            "LinkEstablished",
1377                            "LinkClosed",
1378                            "InterfaceUp",
1379                            "InterfaceDown",
1380                            "InterfaceConfigChanged",
1381                            "BackbonePeerConnected",
1382                            "BackbonePeerDisconnected",
1383                            "BackbonePeerIdleTimeout",
1384                            "BackbonePeerWriteStall",
1385                            "BackbonePeerPenalty",
1386                            "SendOnInterface",
1387                            "BroadcastOnAllInterfaces",
1388                            "DeliverLocal",
1389                            "TunnelSynthesize",
1390                            "Tick",
1391                        ];
1392                        let mut infos = Vec::new();
1393                        for (idx, slot) in self.hook_slots.iter().enumerate() {
1394                            let point_name = hook_point_names.get(idx).unwrap_or(&"Unknown");
1395                            for prog in &slot.programs {
1396                                infos.push(crate::event::HookInfo {
1397                                    name: prog.name.clone(),
1398                                    hook_type: prog.backend_name().to_string(),
1399                                    attach_point: point_name.to_string(),
1400                                    priority: prog.priority,
1401                                    enabled: prog.enabled,
1402                                    consecutive_traps: prog.consecutive_traps,
1403                                });
1404                            }
1405                        }
1406                        let _ = response_tx.send(infos);
1407                    }
1408                    #[cfg(not(feature = "hooks"))]
1409                    {
1410                        let _ = response_tx.send(Vec::new());
1411                    }
1412                }
1413                Event::InterfaceConfigChanged(id) => {
1414                    #[cfg(feature = "hooks")]
1415                    {
1416                        let ctx = HookContext::Interface { interface_id: id.0 };
1417                        let now = time::now();
1418                        let engine_ref = EngineRef {
1419                            engine: &self.engine,
1420                            interfaces: &self.interfaces,
1421                            link_manager: &self.link_manager,
1422                            now,
1423                        };
1424                        let provider_events_enabled = self.provider_events_enabled();
1425                        if let Some(ref e) = run_hook_inner(
1426                            &mut self.hook_slots[HookPoint::InterfaceConfigChanged as usize]
1427                                .programs,
1428                            &self.hook_manager,
1429                            &engine_ref,
1430                            &ctx,
1431                            now,
1432                            provider_events_enabled,
1433                        ) {
1434                            self.forward_hook_side_effects("InterfaceConfigChanged", e);
1435                        }
1436                    }
1437                    #[cfg(not(feature = "hooks"))]
1438                    let _ = id;
1439                }
1440                Event::BackbonePeerConnected {
1441                    server_interface_id,
1442                    peer_interface_id,
1443                    peer_ip,
1444                    peer_port,
1445                } => {
1446                    #[cfg(feature = "hooks")]
1447                    {
1448                        self.run_backbone_peer_hook(
1449                            "BackbonePeerConnected",
1450                            HookPoint::BackbonePeerConnected,
1451                            &BackbonePeerHookEvent {
1452                                server_interface_id,
1453                                peer_interface_id: Some(peer_interface_id),
1454                                peer_ip,
1455                                peer_port,
1456                                connected_for: Duration::ZERO,
1457                                had_received_data: false,
1458                                penalty_level: 0,
1459                                blacklist_for: Duration::ZERO,
1460                            },
1461                        );
1462                    }
1463                    #[cfg(not(feature = "hooks"))]
1464                    let _ = (server_interface_id, peer_interface_id, peer_ip, peer_port);
1465                }
1466                Event::BackbonePeerDisconnected {
1467                    server_interface_id,
1468                    peer_interface_id,
1469                    peer_ip,
1470                    peer_port,
1471                    connected_for,
1472                    had_received_data,
1473                } => {
1474                    #[cfg(feature = "hooks")]
1475                    {
1476                        self.run_backbone_peer_hook(
1477                            "BackbonePeerDisconnected",
1478                            HookPoint::BackbonePeerDisconnected,
1479                            &BackbonePeerHookEvent {
1480                                server_interface_id,
1481                                peer_interface_id: Some(peer_interface_id),
1482                                peer_ip,
1483                                peer_port,
1484                                connected_for,
1485                                had_received_data,
1486                                penalty_level: 0,
1487                                blacklist_for: Duration::ZERO,
1488                            },
1489                        );
1490                    }
1491                    #[cfg(not(feature = "hooks"))]
1492                    let _ = (
1493                        server_interface_id,
1494                        peer_interface_id,
1495                        peer_ip,
1496                        peer_port,
1497                        connected_for,
1498                        had_received_data,
1499                    );
1500                }
1501                Event::BackbonePeerIdleTimeout {
1502                    server_interface_id,
1503                    peer_interface_id,
1504                    peer_ip,
1505                    peer_port,
1506                    connected_for,
1507                } => {
1508                    #[cfg(feature = "hooks")]
1509                    {
1510                        self.run_backbone_peer_hook(
1511                            "BackbonePeerIdleTimeout",
1512                            HookPoint::BackbonePeerIdleTimeout,
1513                            &BackbonePeerHookEvent {
1514                                server_interface_id,
1515                                peer_interface_id: Some(peer_interface_id),
1516                                peer_ip,
1517                                peer_port,
1518                                connected_for,
1519                                had_received_data: false,
1520                                penalty_level: 0,
1521                                blacklist_for: Duration::ZERO,
1522                            },
1523                        );
1524                    }
1525                    #[cfg(not(feature = "hooks"))]
1526                    let _ = (
1527                        server_interface_id,
1528                        peer_interface_id,
1529                        peer_ip,
1530                        peer_port,
1531                        connected_for,
1532                    );
1533                }
1534                Event::BackbonePeerWriteStall {
1535                    server_interface_id,
1536                    peer_interface_id,
1537                    peer_ip,
1538                    peer_port,
1539                    connected_for,
1540                } => {
1541                    #[cfg(feature = "hooks")]
1542                    {
1543                        self.run_backbone_peer_hook(
1544                            "BackbonePeerWriteStall",
1545                            HookPoint::BackbonePeerWriteStall,
1546                            &BackbonePeerHookEvent {
1547                                server_interface_id,
1548                                peer_interface_id: Some(peer_interface_id),
1549                                peer_ip,
1550                                peer_port,
1551                                connected_for,
1552                                had_received_data: false,
1553                                penalty_level: 0,
1554                                blacklist_for: Duration::ZERO,
1555                            },
1556                        );
1557                    }
1558                    #[cfg(not(feature = "hooks"))]
1559                    let _ = (
1560                        server_interface_id,
1561                        peer_interface_id,
1562                        peer_ip,
1563                        peer_port,
1564                        connected_for,
1565                    );
1566                }
1567                Event::BackbonePeerPenalty {
1568                    server_interface_id,
1569                    peer_ip,
1570                    penalty_level,
1571                    blacklist_for,
1572                } => {
1573                    #[cfg(feature = "hooks")]
1574                    {
1575                        self.run_backbone_peer_hook(
1576                            "BackbonePeerPenalty",
1577                            HookPoint::BackbonePeerPenalty,
1578                            &BackbonePeerHookEvent {
1579                                server_interface_id,
1580                                peer_interface_id: None,
1581                                peer_ip,
1582                                peer_port: 0,
1583                                connected_for: Duration::ZERO,
1584                                had_received_data: false,
1585                                penalty_level,
1586                                blacklist_for,
1587                            },
1588                        );
1589                    }
1590                    #[cfg(not(feature = "hooks"))]
1591                    let _ = (server_interface_id, peer_ip, penalty_level, blacklist_for);
1592                }
1593                Event::Shutdown => {
1594                    self.graceful_shutdown();
1595                    break;
1596                }
1597            }
1598        }
1599    }
1600    pub(crate) fn handle_tunnel_synth_delivery(&mut self, raw: &[u8]) {
1601        // Extract the data payload from the raw packet
1602        let packet = match RawPacket::unpack(raw) {
1603            Ok(p) => p,
1604            Err(_) => return,
1605        };
1606
1607        match rns_core::transport::tunnel::validate_tunnel_synthesize_data(&packet.data) {
1608            Ok(validated) => {
1609                // Find the interface this tunnel belongs to by computing the expected
1610                // tunnel_id for each interface with wants_tunnel
1611                let iface_id = self
1612                    .interfaces
1613                    .iter()
1614                    .find(|(_, entry)| entry.info.wants_tunnel && entry.online && entry.enabled)
1615                    .map(|(id, _)| *id);
1616
1617                if let Some(iface) = iface_id {
1618                    let now = time::now();
1619                    let tunnel_actions = self.engine.handle_tunnel(validated.tunnel_id, iface, now);
1620                    self.dispatch_all(tunnel_actions);
1621                }
1622            }
1623            Err(e) => {
1624                log::debug!("Tunnel synthesis validation failed: {}", e);
1625            }
1626        }
1627    }
1628
1629    /// Synthesize a tunnel on an interface that wants it.
1630    ///
1631    /// Called when an interface with `wants_tunnel` comes up.
1632    pub(crate) fn synthesize_tunnel_for_interface(&mut self, interface: InterfaceId) {
1633        if let Some(ref identity) = self.transport_identity {
1634            let actions = self
1635                .engine
1636                .synthesize_tunnel(identity, interface, &mut self.rng);
1637            self.dispatch_all(actions);
1638        }
1639    }
1640
1641    /// Build and send a path request packet for a destination.
1642    pub(crate) fn handle_request_path(&mut self, dest_hash: [u8; 16]) {
1643        // Build path request data: dest_hash(16) || [transport_id(16)] || random_tag(16)
1644        let mut data = Vec::with_capacity(48);
1645        data.extend_from_slice(&dest_hash);
1646
1647        if self.engine.transport_enabled() {
1648            if let Some(id_hash) = self.engine.identity_hash() {
1649                data.extend_from_slice(id_hash);
1650            }
1651        }
1652
1653        // Random tag (16 bytes)
1654        let mut tag = [0u8; 16];
1655        self.rng.fill_bytes(&mut tag);
1656        data.extend_from_slice(&tag);
1657
1658        // Build as BROADCAST DATA PLAIN packet to rnstransport.path.request
1659        let flags = rns_core::packet::PacketFlags {
1660            header_type: rns_core::constants::HEADER_1,
1661            context_flag: rns_core::constants::FLAG_UNSET,
1662            transport_type: rns_core::constants::TRANSPORT_BROADCAST,
1663            destination_type: rns_core::constants::DESTINATION_PLAIN,
1664            packet_type: rns_core::constants::PACKET_TYPE_DATA,
1665        };
1666
1667        if let Ok(packet) = RawPacket::pack(
1668            flags,
1669            0,
1670            &self.path_request_dest,
1671            None,
1672            rns_core::constants::CONTEXT_NONE,
1673            &data,
1674        ) {
1675            let actions = self.engine.handle_outbound(
1676                &packet,
1677                rns_core::constants::DESTINATION_PLAIN,
1678                None,
1679                time::now(),
1680            );
1681            self.dispatch_all(actions);
1682        }
1683    }
1684
1685    /// Check if we should generate a proof for a delivered packet,
1686    /// and if so, sign and send it.
1687    pub(crate) fn get_event_sender(&self) -> crate::event::EventSender {
1688        // The driver doesn't directly have a sender, but node.rs creates the channel
1689        // and passes rx to the driver. We need to store a sender clone.
1690        // For now we use an internal sender that was set during construction.
1691        self.event_tx.clone()
1692    }
1693
1694    /// Delay before first management announce after startup.
1695    const MANAGEMENT_ANNOUNCE_DELAY: f64 = 5.0;
1696
1697    /// Tick the discovery announcer: start stamp generation if due, send announce if ready.
1698    pub(crate) fn tick_discovery_announcer(&mut self, now: f64) {
1699        let announcer = match self.interface_announcer.as_mut() {
1700            Some(a) => a,
1701            None => return,
1702        };
1703
1704        announcer.maybe_start(now);
1705
1706        let stamp_result = match announcer.poll_ready() {
1707            Some(r) => r,
1708            None => return,
1709        };
1710
1711        if !announcer.contains_interface(&stamp_result.interface_name) {
1712            log::debug!(
1713                "Discovery: dropping completed stamp for removed interface '{}'",
1714                stamp_result.interface_name
1715            );
1716            return;
1717        }
1718
1719        let identity = match self.transport_identity.as_ref() {
1720            Some(id) => id,
1721            None => {
1722                log::warn!("Discovery: stamp ready but no transport identity");
1723                return;
1724            }
1725        };
1726
1727        // Discovery is a SINGLE destination — the dest hash includes the transport identity
1728        let identity_hash = identity.hash();
1729        let disc_dest = rns_core::destination::destination_hash(
1730            crate::discovery::APP_NAME,
1731            &["discovery", "interface"],
1732            Some(&identity_hash),
1733        );
1734        let name_hash = self.discovery_name_hash;
1735        let mut random_hash = [0u8; 10];
1736        self.rng.fill_bytes(&mut random_hash);
1737
1738        let (announce_data, _) = match rns_core::announce::AnnounceData::pack(
1739            identity,
1740            &disc_dest,
1741            &name_hash,
1742            &random_hash,
1743            None,
1744            Some(&stamp_result.app_data),
1745        ) {
1746            Ok(v) => v,
1747            Err(e) => {
1748                log::warn!("Discovery: failed to pack announce: {}", e);
1749                return;
1750            }
1751        };
1752
1753        let flags = rns_core::packet::PacketFlags {
1754            header_type: rns_core::constants::HEADER_1,
1755            context_flag: rns_core::constants::FLAG_UNSET,
1756            transport_type: rns_core::constants::TRANSPORT_BROADCAST,
1757            destination_type: rns_core::constants::DESTINATION_SINGLE,
1758            packet_type: rns_core::constants::PACKET_TYPE_ANNOUNCE,
1759        };
1760
1761        let packet = match RawPacket::pack(
1762            flags,
1763            0,
1764            &disc_dest,
1765            None,
1766            rns_core::constants::CONTEXT_NONE,
1767            &announce_data,
1768        ) {
1769            Ok(p) => p,
1770            Err(e) => {
1771                log::warn!("Discovery: failed to pack packet: {}", e);
1772                return;
1773            }
1774        };
1775
1776        let outbound_actions = self.engine.handle_outbound(
1777            &packet,
1778            rns_core::constants::DESTINATION_SINGLE,
1779            None,
1780            now,
1781        );
1782        log::debug!(
1783            "Discovery announce sent for interface '{}' ({} actions, dest={:02x?})",
1784            stamp_result.interface_name,
1785            outbound_actions.len(),
1786            &disc_dest[..4],
1787        );
1788        self.dispatch_all(outbound_actions);
1789    }
1790
1791    /// Read RSS from /proc/self/statm (Linux only).
1792    pub(crate) fn rss_mb() -> Option<f64> {
1793        let statm = std::fs::read_to_string("/proc/self/statm").ok()?;
1794        let rss_pages: u64 = statm.split_whitespace().nth(1)?.parse().ok()?;
1795        Some(rss_pages as f64 * 4096.0 / (1024.0 * 1024.0))
1796    }
1797
1798    pub(crate) fn parse_proc_kib(contents: &str, key: &str) -> Option<u64> {
1799        contents.lines().find_map(|line| {
1800            let value = line.strip_prefix(key)?;
1801            value.split_whitespace().next()?.parse().ok()
1802        })
1803    }
1804
1805    pub(crate) fn proc_status_mb() -> Option<(f64, f64, f64, f64)> {
1806        let status = std::fs::read_to_string("/proc/self/status").ok()?;
1807        let vm_rss = Self::parse_proc_kib(&status, "VmRSS:")? as f64 / 1024.0;
1808        let vm_hwm = Self::parse_proc_kib(&status, "VmHWM:")? as f64 / 1024.0;
1809        let vm_data = Self::parse_proc_kib(&status, "VmData:")? as f64 / 1024.0;
1810        let vm_swap = Self::parse_proc_kib(&status, "VmSwap:").unwrap_or(0) as f64 / 1024.0;
1811        Some((vm_rss, vm_hwm, vm_data, vm_swap))
1812    }
1813
1814    pub(crate) fn smaps_rollup_mb() -> Option<(f64, f64, f64, f64, f64, f64, f64, f64)> {
1815        let smaps = std::fs::read_to_string("/proc/self/smaps_rollup").ok()?;
1816        let rss_kib = Self::parse_proc_kib(&smaps, "Rss:")?;
1817        let anon_kib = Self::parse_proc_kib(&smaps, "Anonymous:")?;
1818        let shared_clean_kib = Self::parse_proc_kib(&smaps, "Shared_Clean:").unwrap_or(0);
1819        let shared_dirty_kib = Self::parse_proc_kib(&smaps, "Shared_Dirty:").unwrap_or(0);
1820        let private_clean_kib = Self::parse_proc_kib(&smaps, "Private_Clean:").unwrap_or(0);
1821        let private_dirty_kib = Self::parse_proc_kib(&smaps, "Private_Dirty:").unwrap_or(0);
1822        let swap_kib = Self::parse_proc_kib(&smaps, "Swap:").unwrap_or(0);
1823        let file_est_kib = rss_kib.saturating_sub(anon_kib);
1824        Some((
1825            rss_kib as f64 / 1024.0,
1826            anon_kib as f64 / 1024.0,
1827            file_est_kib as f64 / 1024.0,
1828            shared_clean_kib as f64 / 1024.0,
1829            shared_dirty_kib as f64 / 1024.0,
1830            private_clean_kib as f64 / 1024.0,
1831            private_dirty_kib as f64 / 1024.0,
1832            swap_kib as f64 / 1024.0,
1833        ))
1834    }
1835
1836    /// Log sizes of all major collections for memory growth diagnostics.
1837    pub(crate) fn log_memory_stats(&self) {
1838        let rss = Self::rss_mb()
1839            .map(|v| format!("{:.1}", v))
1840            .unwrap_or_else(|| "N/A".into());
1841        let (vm_rss, vm_hwm, vm_data, vm_swap) = Self::proc_status_mb()
1842            .map(|(rss, hwm, data, swap)| {
1843                (
1844                    format!("{rss:.1}"),
1845                    format!("{hwm:.1}"),
1846                    format!("{data:.1}"),
1847                    format!("{swap:.1}"),
1848                )
1849            })
1850            .unwrap_or_else(|| ("N/A".into(), "N/A".into(), "N/A".into(), "N/A".into()));
1851        let (
1852            smaps_rss,
1853            smaps_anon,
1854            smaps_file_est,
1855            smaps_shared_clean,
1856            smaps_shared_dirty,
1857            smaps_private_clean,
1858            smaps_private_dirty,
1859            smaps_swap,
1860        ) = Self::smaps_rollup_mb()
1861            .map(
1862                |(
1863                    rss,
1864                    anon,
1865                    file_est,
1866                    shared_clean,
1867                    shared_dirty,
1868                    private_clean,
1869                    private_dirty,
1870                    swap,
1871                )| {
1872                    (
1873                        format!("{rss:.1}"),
1874                        format!("{anon:.1}"),
1875                        format!("{file_est:.1}"),
1876                        format!("{shared_clean:.1}"),
1877                        format!("{shared_dirty:.1}"),
1878                        format!("{private_clean:.1}"),
1879                        format!("{private_dirty:.1}"),
1880                        format!("{swap:.1}"),
1881                    )
1882                },
1883            )
1884            .unwrap_or_else(|| {
1885                (
1886                    "N/A".into(),
1887                    "N/A".into(),
1888                    "N/A".into(),
1889                    "N/A".into(),
1890                    "N/A".into(),
1891                    "N/A".into(),
1892                    "N/A".into(),
1893                    "N/A".into(),
1894                )
1895            });
1896        log::info!(
1897            "MEMSTATS rss_mb={} vmrss_mb={} vmhwm_mb={} vmdata_mb={} vmswap_mb={} smaps_rss_mb={} smaps_anon_mb={} smaps_file_est_mb={} smaps_shared_clean_mb={} smaps_shared_dirty_mb={} smaps_private_clean_mb={} smaps_private_dirty_mb={} smaps_swap_mb={} known_dest={} known_dest_cap_evict={} path={} path_cap_evict={} announce={} reverse={}              link={} held_ann={} hashlist={} sig_cache={} ann_verify_q={} rate_lim={} blackhole={} tunnel={} ann_q_ifaces={} ann_q_nonempty={} ann_q_entries={} ann_q_bytes={} ann_q_iface_drop={}              pr_tags={} disc_pr={} sent_pkt={} completed={} local_dest={}              shared_ann={} lm_links={} hp_sessions={} proof_strat={}",
1898            rss,
1899            vm_rss,
1900            vm_hwm,
1901            vm_data,
1902            vm_swap,
1903            smaps_rss,
1904            smaps_anon,
1905            smaps_file_est,
1906            smaps_shared_clean,
1907            smaps_shared_dirty,
1908            smaps_private_clean,
1909            smaps_private_dirty,
1910            smaps_swap,
1911            self.known_destinations.len(),
1912            self.known_destinations_cap_evict_count,
1913            self.engine.path_table_count(),
1914            self.engine.path_destination_cap_evict_count(),
1915            self.engine.announce_table_count(),
1916            self.engine.reverse_table_count(),
1917            self.engine.link_table_count(),
1918            self.engine.held_announces_count(),
1919            self.engine.packet_hashlist_len(),
1920            self.engine.announce_sig_cache_len(),
1921            self.announce_verify_queue
1922                .lock()
1923                .map(|queue| queue.len())
1924                .unwrap_or(0),
1925            self.engine.rate_limiter_count(),
1926            self.engine.blackholed_count(),
1927            self.engine.tunnel_count(),
1928            self.engine.announce_queue_count(),
1929            self.engine.nonempty_announce_queue_count(),
1930            self.engine.queued_announce_count(),
1931            self.engine.queued_announce_bytes(),
1932            self.engine.announce_queue_interface_cap_drop_count(),
1933            self.engine.discovery_pr_tags_count(),
1934            self.engine.discovery_path_requests_count(),
1935            self.sent_packets.len(),
1936            self.completed_proofs.len(),
1937            self.local_destinations.len(),
1938            self.shared_announces.len(),
1939            self.link_manager.link_count(),
1940            self.holepunch_manager.session_count(),
1941            self.proof_strategies.len(),
1942        );
1943    }
1944
1945    /// Emit management and/or blackhole announces if enabled and due.
1946    pub(crate) fn tick_management_announces(&mut self, now: f64) {
1947        if self.transport_identity.is_none() {
1948            return;
1949        }
1950
1951        let uptime = now - self.started;
1952
1953        // Wait for initial delay
1954        if !self.initial_announce_sent {
1955            if uptime < Self::MANAGEMENT_ANNOUNCE_DELAY {
1956                return;
1957            }
1958            self.initial_announce_sent = true;
1959            self.emit_management_announces(now);
1960            return;
1961        }
1962
1963        // Periodic re-announce
1964        if now - self.last_management_announce >= self.management_announce_interval_secs {
1965            self.emit_management_announces(now);
1966        }
1967    }
1968
1969    /// Emit management/blackhole announce packets through the engine outbound path.
1970    pub(crate) fn emit_management_announces(&mut self, now: f64) {
1971        use crate::management;
1972
1973        self.last_management_announce = now;
1974
1975        let identity = match self.transport_identity {
1976            Some(ref id) => id,
1977            None => return,
1978        };
1979
1980        // Build announce packets first (immutable borrow of identity), then dispatch
1981        let mgmt_raw = if self.management_config.enable_remote_management {
1982            management::build_management_announce(identity, &mut self.rng)
1983        } else {
1984            None
1985        };
1986
1987        let bh_raw = if self.management_config.publish_blackhole {
1988            management::build_blackhole_announce(identity, &mut self.rng)
1989        } else {
1990            None
1991        };
1992
1993        let probe_raw = if self.probe_responder_hash.is_some() {
1994            management::build_probe_announce(identity, &mut self.rng)
1995        } else {
1996            None
1997        };
1998
1999        if let Some(raw) = mgmt_raw {
2000            if let Ok(packet) = RawPacket::unpack(&raw) {
2001                let actions = self.engine.handle_outbound(
2002                    &packet,
2003                    rns_core::constants::DESTINATION_SINGLE,
2004                    None,
2005                    now,
2006                );
2007                self.dispatch_all(actions);
2008                log::debug!("Emitted management destination announce");
2009            }
2010        }
2011
2012        if let Some(raw) = bh_raw {
2013            if let Ok(packet) = RawPacket::unpack(&raw) {
2014                let actions = self.engine.handle_outbound(
2015                    &packet,
2016                    rns_core::constants::DESTINATION_SINGLE,
2017                    None,
2018                    now,
2019                );
2020                self.dispatch_all(actions);
2021                log::debug!("Emitted blackhole info announce");
2022            }
2023        }
2024
2025        if let Some(raw) = probe_raw {
2026            if let Ok(packet) = RawPacket::unpack(&raw) {
2027                let actions = self.engine.handle_outbound(
2028                    &packet,
2029                    rns_core::constants::DESTINATION_SINGLE,
2030                    None,
2031                    now,
2032                );
2033                self.dispatch_all(actions);
2034                log::debug!("Emitted probe responder announce");
2035            }
2036        }
2037    }
2038
2039    /// Handle a management request by querying engine state and sending a response.
2040    pub(crate) fn handle_management_request(
2041        &mut self,
2042        link_id: [u8; 16],
2043        path_hash: [u8; 16],
2044        data: Vec<u8>,
2045        request_id: [u8; 16],
2046        remote_identity: Option<([u8; 16], [u8; 64])>,
2047    ) {
2048        use crate::management;
2049
2050        // ACL check for /status and /path (ALLOW_LIST), /list is ALLOW_ALL
2051        let is_restricted = path_hash == management::status_path_hash()
2052            || path_hash == management::path_path_hash();
2053
2054        if is_restricted && !self.management_config.remote_management_allowed.is_empty() {
2055            match remote_identity {
2056                Some((identity_hash, _)) => {
2057                    if !self
2058                        .management_config
2059                        .remote_management_allowed
2060                        .contains(&identity_hash)
2061                    {
2062                        log::debug!("Management request denied: identity not in allowed list");
2063                        return;
2064                    }
2065                }
2066                None => {
2067                    log::debug!("Management request denied: peer not identified");
2068                    return;
2069                }
2070            }
2071        }
2072
2073        let response_data = if path_hash == management::status_path_hash() {
2074            {
2075                let views: Vec<&dyn management::InterfaceStatusView> = self
2076                    .interfaces
2077                    .values()
2078                    .map(|e| e as &dyn management::InterfaceStatusView)
2079                    .collect();
2080                management::handle_status_request(
2081                    &data,
2082                    &self.engine,
2083                    &views,
2084                    self.started,
2085                    self.probe_responder_hash,
2086                )
2087            }
2088        } else if path_hash == management::path_path_hash() {
2089            management::handle_path_request(&data, &self.engine)
2090        } else if path_hash == management::list_path_hash() {
2091            management::handle_blackhole_list_request(&self.engine)
2092        } else {
2093            log::warn!("Unknown management path_hash: {:02x?}", &path_hash[..4]);
2094            None
2095        };
2096
2097        if let Some(response) = response_data {
2098            let actions = self.link_manager.send_management_response(
2099                &link_id,
2100                &request_id,
2101                &response,
2102                &mut self.rng,
2103            );
2104            self.dispatch_link_actions(actions);
2105        }
2106    }
2107}