Skip to main content

rns_net/driver/
events.rs

1use super::*;
2
3impl Driver {
4    pub(crate) fn handle_frame_event(&mut self, interface_id: InterfaceId, data: Vec<u8>) {
5        if data.len() > 2 && (data[0] & 0x03) == 0x01 {
6            log::debug!(
7                "Announce:frame from iface {} (len={}, flags=0x{:02x})",
8                interface_id.0,
9                data.len(),
10                data[0]
11            );
12        }
13        if let Some(entry) = self.interfaces.get(&interface_id) {
14            if !entry.enabled || !entry.online {
15                return;
16            }
17        }
18        if let Some(entry) = self.interfaces.get_mut(&interface_id) {
19            entry.stats.rxb += data.len() as u64;
20            entry.stats.rx_packets += 1;
21        }
22
23        let packet = if let Some(entry) = self.interfaces.get(&interface_id) {
24            if let Some(ref ifac_state) = entry.ifac {
25                match ifac::unmask_inbound(&data, ifac_state) {
26                    Some(unmasked) => unmasked,
27                    None => {
28                        log::debug!("[{}] IFAC rejected packet", interface_id.0);
29                        return;
30                    }
31                }
32            } else {
33                if data.len() > 2 && data[0] & 0x80 == 0x80 {
34                    log::debug!(
35                        "[{}] dropping packet with IFAC flag on non-IFAC interface",
36                        interface_id.0
37                    );
38                    return;
39                }
40                data
41            }
42        } else {
43            data
44        };
45
46        #[cfg(feature = "hooks")]
47        {
48            let pkt_ctx = rns_hooks::PacketContext {
49                flags: if packet.is_empty() { 0 } else { packet[0] },
50                hops: if packet.len() > 1 { packet[1] } else { 0 },
51                destination_hash: extract_dest_hash(&packet),
52                context: 0,
53                packet_hash: [0; 32],
54                interface_id: interface_id.0,
55                data_offset: 0,
56                data_len: packet.len() as u32,
57            };
58            let ctx = HookContext::Packet {
59                ctx: &pkt_ctx,
60                raw: &packet,
61            };
62            let now = time::now();
63            let engine_ref = EngineRef {
64                engine: &self.engine,
65                interfaces: &self.interfaces,
66                link_manager: &self.link_manager,
67                now,
68            };
69            let provider_events_enabled = self.provider_events_enabled();
70            if let Some(ref e) = run_hook_inner(
71                &mut self.hook_slots[HookPoint::PreIngress as usize].programs,
72                &self.hook_manager,
73                &engine_ref,
74                &ctx,
75                now,
76                provider_events_enabled,
77            ) {
78                self.forward_hook_side_effects("PreIngress", e);
79                if e.hook_result.as_ref().is_some_and(|r| r.is_drop()) {
80                    return;
81                }
82            }
83        }
84
85        if packet.len() > 2 && (packet[0] & 0x03) == 0x01 {
86            let now = time::now();
87            if let Some(entry) = self.interfaces.get_mut(&interface_id) {
88                entry.stats.record_incoming_announce(now);
89            }
90        }
91
92        if let Some(entry) = self.interfaces.get(&interface_id) {
93            self.engine
94                .update_interface_freq(interface_id, entry.stats.incoming_announce_freq());
95        }
96
97        let actions = if self.async_announce_verification {
98            let mut announce_queue = self
99                .announce_verify_queue
100                .lock()
101                .unwrap_or_else(|poisoned| poisoned.into_inner());
102            self.engine.handle_inbound_with_announce_queue(
103                &packet,
104                interface_id,
105                time::now(),
106                &mut self.rng,
107                Some(&mut announce_queue),
108            )
109        } else {
110            self.engine
111                .handle_inbound(&packet, interface_id, time::now(), &mut self.rng)
112        };
113
114        #[cfg(feature = "hooks")]
115        {
116            let pkt_ctx = rns_hooks::PacketContext {
117                flags: if packet.is_empty() { 0 } else { packet[0] },
118                hops: if packet.len() > 1 { packet[1] } else { 0 },
119                destination_hash: extract_dest_hash(&packet),
120                context: 0,
121                packet_hash: [0; 32],
122                interface_id: interface_id.0,
123                data_offset: 0,
124                data_len: packet.len() as u32,
125            };
126            let ctx = HookContext::Packet {
127                ctx: &pkt_ctx,
128                raw: &packet,
129            };
130            let now = time::now();
131            let engine_ref = EngineRef {
132                engine: &self.engine,
133                interfaces: &self.interfaces,
134                link_manager: &self.link_manager,
135                now,
136            };
137            let provider_events_enabled = self.provider_events_enabled();
138            if let Some(ref e) = run_hook_inner(
139                &mut self.hook_slots[HookPoint::PreDispatch as usize].programs,
140                &self.hook_manager,
141                &engine_ref,
142                &ctx,
143                now,
144                provider_events_enabled,
145            ) {
146                self.forward_hook_side_effects("PreDispatch", e);
147            }
148        }
149
150        self.dispatch_all(actions);
151    }
152
153    pub(crate) fn handle_announce_verified_event(
154        &mut self,
155        key: rns_core::transport::announce_verify_queue::AnnounceVerifyKey,
156        validated: rns_core::announce::ValidatedAnnounce,
157        sig_cache_key: [u8; 32],
158    ) {
159        let pending = {
160            let mut announce_queue = self
161                .announce_verify_queue
162                .lock()
163                .unwrap_or_else(|poisoned| poisoned.into_inner());
164            announce_queue.complete_success(&key)
165        };
166        if let Some(pending) = pending {
167            let actions = self.engine.complete_verified_announce(
168                pending,
169                validated,
170                sig_cache_key,
171                time::now(),
172                &mut self.rng,
173            );
174            self.dispatch_all(actions);
175        }
176    }
177
178    pub(crate) fn handle_tick_event(&mut self) {
179        #[cfg(feature = "hooks")]
180        {
181            let ctx = HookContext::Tick;
182            let now = time::now();
183            let engine_ref = EngineRef {
184                engine: &self.engine,
185                interfaces: &self.interfaces,
186                link_manager: &self.link_manager,
187                now,
188            };
189            let provider_events_enabled = self.provider_events_enabled();
190            if let Some(ref e) = run_hook_inner(
191                &mut self.hook_slots[HookPoint::Tick as usize].programs,
192                &self.hook_manager,
193                &engine_ref,
194                &ctx,
195                now,
196                provider_events_enabled,
197            ) {
198                self.forward_hook_side_effects("Tick", e);
199            }
200        }
201
202        let now = time::now();
203        for (id, entry) in &self.interfaces {
204            self.engine
205                .update_interface_freq(*id, entry.stats.incoming_announce_freq());
206        }
207        let actions = self.engine.tick(now, &mut self.rng);
208        self.dispatch_all(actions);
209        let link_actions = self.link_manager.tick(&mut self.rng);
210        self.dispatch_link_actions(link_actions);
211        self.enforce_drain_deadline();
212        {
213            let tx = self.get_event_sender();
214            let hp_actions = self.holepunch_manager.tick(&tx);
215            self.dispatch_holepunch_actions(hp_actions);
216        }
217        self.tick_management_announces(now);
218        self.sent_packets
219            .retain(|_, (_, sent_time)| now - *sent_time < 60.0);
220        self.completed_proofs
221            .retain(|_, (_, received)| now - *received < 120.0);
222
223        self.tick_discovery_announcer(now);
224        #[cfg(feature = "iface-backbone")]
225        self.maintain_backbone_peer_pool();
226
227        self.memory_stats_counter += 1;
228        if self.memory_stats_counter >= 300 {
229            self.memory_stats_counter = 0;
230            self.log_memory_stats();
231        }
232
233        if self.discover_interfaces {
234            self.discovery_cleanup_counter += 1;
235            if self.discovery_cleanup_counter >= self.discovery_cleanup_interval_ticks {
236                self.discovery_cleanup_counter = 0;
237                if let Ok(removed) = self.discovered_interfaces.cleanup() {
238                    if removed > 0 {
239                        log::info!("Discovery cleanup: removed {} stale entries", removed);
240                    }
241                }
242            }
243        }
244
245        self.cache_cleanup_counter += 1;
246        if self.cache_cleanup_counter >= self.known_destinations_cleanup_interval_ticks {
247            self.cache_cleanup_counter = 0;
248
249            let active_dests = self.engine.active_destination_hashes();
250            let ttl = self.known_destinations_ttl;
251            let kd_before = self.known_destinations.len();
252            self.known_destinations.retain(|k, state| {
253                active_dests.contains(k)
254                    || self.local_destinations.contains_key(k)
255                    || state.retained
256                    || now - Self::known_destination_relevance_time(state) < ttl
257            });
258            let kd_removed = kd_before - self.known_destinations.len();
259            let kd_evicted = self.enforce_known_destination_cap(false);
260            let rl_removed =
261                self.engine
262                    .cull_rate_limiter(&active_dests, now, self.rate_limiter_ttl_secs);
263
264            if kd_removed > 0 || kd_evicted > 0 || rl_removed > 0 {
265                log::info!(
266                    "Memory cleanup: removed {} known_destinations, evicted {} known_destinations, {} rate_limiter entries",
267                    kd_removed, kd_evicted, rl_removed
268                );
269            }
270        }
271
272        self.announce_cache_cleanup_counter += 1;
273        if self.announce_cache_cleanup_counter >= self.announce_cache_cleanup_interval_ticks {
274            self.announce_cache_cleanup_counter = 0;
275            if self.announce_cache.is_some() && self.cache_cleanup_active_hashes.is_none() {
276                self.cache_cleanup_active_hashes = Some(self.engine.active_packet_hashes());
277                self.cache_cleanup_entries = None;
278                self.cache_cleanup_removed = 0;
279            }
280        }
281
282        if self.cache_cleanup_active_hashes.is_some() {
283            if let Some(ref cache) = self.announce_cache {
284                if self.cache_cleanup_entries.is_none() {
285                    match cache.entries() {
286                        Ok(entries) => self.cache_cleanup_entries = Some(entries),
287                        Err(e) => {
288                            log::warn!("Announce cache cleanup failed to open directory: {}", e);
289                            self.cache_cleanup_active_hashes = None;
290                            self.cache_cleanup_entries = None;
291                        }
292                    }
293                }
294            }
295
296            if let Some(ref cache) = self.announce_cache {
297                let Some(active_hashes) = self.cache_cleanup_active_hashes.as_ref() else {
298                    self.cache_cleanup_entries = None;
299                    return;
300                };
301                let entries = match self.cache_cleanup_entries.as_mut() {
302                    Some(entries) => entries,
303                    None => return,
304                };
305                match cache.clean_batch(
306                    active_hashes,
307                    entries,
308                    self.announce_cache_cleanup_batch_size,
309                ) {
310                    Ok((removed, finished)) => {
311                        self.cache_cleanup_removed += removed;
312                        if finished {
313                            if self.cache_cleanup_removed > 0 {
314                                log::info!(
315                                    "Announce cache cleanup complete: removed {} stale files",
316                                    self.cache_cleanup_removed
317                                );
318                            }
319                            self.cache_cleanup_active_hashes = None;
320                            self.cache_cleanup_entries = None;
321                        }
322                    }
323                    Err(e) => {
324                        log::warn!("Announce cache cleanup failed: {}", e);
325                        self.cache_cleanup_active_hashes = None;
326                        self.cache_cleanup_entries = None;
327                    }
328                }
329            } else {
330                self.cache_cleanup_active_hashes = None;
331                self.cache_cleanup_entries = None;
332            }
333        }
334    }
335
336    pub(crate) fn handle_interface_up_event(
337        &mut self,
338        id: InterfaceId,
339        new_writer: Option<Box<dyn crate::interface::Writer>>,
340        info: Option<rns_core::transport::types::InterfaceInfo>,
341    ) {
342        let wants_tunnel;
343        let mut replay_shared_announces = false;
344        if let Some(mut info) = info {
345            log::info!("[{}] dynamic interface registered", id.0);
346            wants_tunnel = info.wants_tunnel;
347            let iface_type = infer_interface_type(&info.name);
348            info.started = time::now();
349            self.register_interface_runtime_defaults(&info);
350            self.engine.register_interface(info.clone());
351            if let Some(writer) = new_writer {
352                let (writer, async_writer_metrics) =
353                    self.wrap_interface_writer(id, &info.name, writer);
354                self.interfaces.insert(
355                    id,
356                    InterfaceEntry {
357                        id,
358                        info,
359                        writer,
360                        async_writer_metrics: Some(async_writer_metrics),
361                        enabled: true,
362                        online: true,
363                        dynamic: true,
364                        ifac: None,
365                        stats: InterfaceStats {
366                            started: time::now(),
367                            ..Default::default()
368                        },
369                        interface_type: iface_type,
370                        send_retry_at: None,
371                        send_retry_backoff: Duration::ZERO,
372                    },
373                );
374            }
375            self.callbacks.on_interface_up(id);
376            #[cfg(feature = "hooks")]
377            {
378                let ctx = HookContext::Interface { interface_id: id.0 };
379                let now = time::now();
380                let engine_ref = EngineRef {
381                    engine: &self.engine,
382                    interfaces: &self.interfaces,
383                    link_manager: &self.link_manager,
384                    now,
385                };
386                let provider_events_enabled = self.provider_events_enabled();
387                if let Some(ref e) = run_hook_inner(
388                    &mut self.hook_slots[HookPoint::InterfaceUp as usize].programs,
389                    &self.hook_manager,
390                    &engine_ref,
391                    &ctx,
392                    now,
393                    provider_events_enabled,
394                ) {
395                    self.forward_hook_side_effects("InterfaceUp", e);
396                }
397            }
398        } else {
399            let is_local_client = self
400                .interfaces
401                .get(&id)
402                .map(|entry| entry.info.is_local_client)
403                .unwrap_or(false);
404            replay_shared_announces =
405                is_local_client && self.shared_reconnect_pending.remove(&id).unwrap_or(false);
406            let interface_name = self
407                .interfaces
408                .get(&id)
409                .map(|entry| entry.info.name.clone())
410                .unwrap_or_else(|| format!("iface-{}", id.0));
411            let wrapped_writer =
412                new_writer.map(|writer| self.wrap_interface_writer(id, &interface_name, writer));
413            if let Some(entry) = self.interfaces.get_mut(&id) {
414                log::info!("[{}] interface online", id.0);
415                wants_tunnel = entry.info.wants_tunnel;
416                entry.online = true;
417                if let Some((writer, async_writer_metrics)) = wrapped_writer {
418                    log::info!("[{}] writer refreshed after reconnect", id.0);
419                    entry.writer = writer;
420                    entry.async_writer_metrics = Some(async_writer_metrics);
421                }
422                self.callbacks.on_interface_up(id);
423                #[cfg(feature = "hooks")]
424                {
425                    let ctx = HookContext::Interface { interface_id: id.0 };
426                    let now = time::now();
427                    let engine_ref = EngineRef {
428                        engine: &self.engine,
429                        interfaces: &self.interfaces,
430                        link_manager: &self.link_manager,
431                        now,
432                    };
433                    let provider_events_enabled = self.provider_events_enabled();
434                    if let Some(ref e) = run_hook_inner(
435                        &mut self.hook_slots[HookPoint::InterfaceUp as usize].programs,
436                        &self.hook_manager,
437                        &engine_ref,
438                        &ctx,
439                        now,
440                        provider_events_enabled,
441                    ) {
442                        self.forward_hook_side_effects("InterfaceUp", e);
443                    }
444                }
445            } else {
446                wants_tunnel = false;
447            }
448        }
449
450        if wants_tunnel {
451            self.synthesize_tunnel_for_interface(id);
452        }
453        if replay_shared_announces {
454            self.replay_shared_announces();
455        }
456    }
457
458    pub(crate) fn handle_interface_down_event(&mut self, id: InterfaceId) {
459        if let Some(entry) = self.interfaces.get(&id) {
460            if let Some(tunnel_id) = entry.info.tunnel_id {
461                self.engine.void_tunnel_interface(&tunnel_id);
462            }
463        }
464
465        if let Some(entry) = self.interfaces.get(&id) {
466            let is_dynamic = entry.dynamic;
467            let is_local_client = entry.info.is_local_client;
468            let interface_name = entry.info.name.clone();
469            if is_dynamic {
470                log::info!("[{}] dynamic interface removed", id.0);
471                self.interface_runtime_defaults.remove(&interface_name);
472                self.engine.deregister_interface(id);
473                self.interfaces.remove(&id);
474            } else {
475                log::info!("[{}] interface offline", id.0);
476                if let Some(entry) = self.interfaces.get_mut(&id) {
477                    entry.online = false;
478                } else {
479                    log::warn!(
480                        "interface {} disappeared while handling interface-down",
481                        id.0
482                    );
483                    return;
484                }
485                if is_local_client {
486                    self.handle_shared_interface_down(id);
487                }
488            }
489            self.callbacks.on_interface_down(id);
490            #[cfg(feature = "hooks")]
491            {
492                let ctx = HookContext::Interface { interface_id: id.0 };
493                let now = time::now();
494                let engine_ref = EngineRef {
495                    engine: &self.engine,
496                    interfaces: &self.interfaces,
497                    link_manager: &self.link_manager,
498                    now,
499                };
500                let provider_events_enabled = self.provider_events_enabled();
501                if let Some(ref e) = run_hook_inner(
502                    &mut self.hook_slots[HookPoint::InterfaceDown as usize].programs,
503                    &self.hook_manager,
504                    &engine_ref,
505                    &ctx,
506                    now,
507                    provider_events_enabled,
508                ) {
509                    self.forward_hook_side_effects("InterfaceDown", e);
510                }
511            }
512        }
513        #[cfg(feature = "iface-backbone")]
514        self.handle_backbone_peer_pool_down(id);
515    }
516
517    pub(crate) fn known_destination_route_hint(
518        &self,
519        dest_hash: &[u8; 16],
520    ) -> Option<(InterfaceId, u8)> {
521        let announced = &self.known_destinations.get(dest_hash)?.announced;
522        let iface = announced.receiving_interface;
523        if iface.0 == 0 {
524            return None;
525        }
526
527        self.interfaces
528            .get(&iface)
529            .filter(|entry| entry.online)
530            .map(|_| (iface, announced.hops))
531    }
532
533    pub(crate) fn handle_send_outbound_event(
534        &mut self,
535        raw: Vec<u8>,
536        dest_type: u8,
537        attached_interface: Option<InterfaceId>,
538    ) {
539        if self.is_draining() {
540            self.reject_new_work("send outbound packet");
541            return;
542        }
543        match RawPacket::unpack(&raw) {
544            Ok(packet) => {
545                let is_announce =
546                    packet.flags.packet_type == rns_core::constants::PACKET_TYPE_ANNOUNCE;
547                if is_announce {
548                    log::debug!(
549                        "SendOutbound: ANNOUNCE for {:02x?} (len={}, dest_type={}, attached={:?})",
550                        &packet.destination_hash[..4],
551                        raw.len(),
552                        dest_type,
553                        attached_interface
554                    );
555                }
556                if packet.flags.packet_type == rns_core::constants::PACKET_TYPE_DATA {
557                    self.sent_packets
558                        .insert(packet.packet_hash, (packet.destination_hash, time::now()));
559                }
560                let actions = self.engine.handle_outbound(
561                    &packet,
562                    dest_type,
563                    attached_interface,
564                    time::now(),
565                );
566                if is_announce {
567                    log::debug!(
568                        "SendOutbound: announce routed to {} actions: {:?}",
569                        actions.len(),
570                        actions
571                            .iter()
572                            .map(|a| match a {
573                                TransportAction::SendOnInterface { interface, .. } =>
574                                    format!("SendOn({})", interface.0),
575                                TransportAction::BroadcastOnAllInterfaces { .. } =>
576                                    "BroadcastAll".to_string(),
577                                _ => "other".to_string(),
578                            })
579                            .collect::<Vec<_>>()
580                    );
581                }
582                self.dispatch_all(actions);
583            }
584            Err(e) => {
585                log::warn!("SendOutbound: failed to unpack packet: {:?}", e);
586            }
587        }
588    }
589
590    /// Run the event loop. Blocks until Shutdown or all senders are dropped.
591    pub fn run(&mut self) {
592        loop {
593            let event = match self.rx.recv() {
594                Ok(e) => e,
595                Err(_) => break, // all senders dropped
596            };
597
598            match event {
599                Event::Frame { interface_id, data } => {
600                    self.handle_frame_event(interface_id, data);
601                }
602                Event::AnnounceVerified {
603                    key,
604                    validated,
605                    sig_cache_key,
606                } => {
607                    self.handle_announce_verified_event(key, validated, sig_cache_key);
608                }
609                Event::AnnounceVerifyFailed { key, .. } => {
610                    let mut announce_queue = self
611                        .announce_verify_queue
612                        .lock()
613                        .unwrap_or_else(|poisoned| poisoned.into_inner());
614                    let _ = announce_queue.complete_failure(&key);
615                }
616                Event::Tick => self.handle_tick_event(),
617                Event::BeginDrain { timeout } => {
618                    self.begin_drain(timeout);
619                }
620                Event::InterfaceUp(id, new_writer, info) => {
621                    self.handle_interface_up_event(id, new_writer, info);
622                }
623                Event::InterfaceDown(id) => self.handle_interface_down_event(id),
624                Event::SendOutbound {
625                    raw,
626                    dest_type,
627                    attached_interface,
628                } => self.handle_send_outbound_event(raw, dest_type, attached_interface),
629                Event::RegisterDestination {
630                    dest_hash,
631                    dest_type,
632                } => {
633                    self.engine.register_destination(dest_hash, dest_type);
634                    self.local_destinations.insert(dest_hash, dest_type);
635                }
636                Event::StoreSharedAnnounce {
637                    dest_hash,
638                    name_hash,
639                    identity_prv_key,
640                    app_data,
641                } => {
642                    self.shared_announces.insert(
643                        dest_hash,
644                        SharedAnnounceRecord {
645                            name_hash,
646                            identity_prv_key,
647                            app_data,
648                        },
649                    );
650                }
651                Event::DeregisterDestination { dest_hash } => {
652                    self.engine.deregister_destination(&dest_hash);
653                    self.local_destinations.remove(&dest_hash);
654                    self.shared_announces.remove(&dest_hash);
655                }
656                Event::Query(request, response_tx) => {
657                    let response = self.handle_query_mut(request);
658                    let _ = response_tx.send(response);
659                }
660                Event::DeregisterLinkDestination { dest_hash } => {
661                    self.link_manager.deregister_link_destination(&dest_hash);
662                }
663                Event::RegisterLinkDestination {
664                    dest_hash,
665                    sig_prv_bytes,
666                    sig_pub_bytes,
667                    resource_strategy,
668                } => {
669                    let sig_prv =
670                        rns_crypto::ed25519::Ed25519PrivateKey::from_bytes(&sig_prv_bytes);
671                    let strat = match resource_strategy {
672                        1 => crate::link_manager::ResourceStrategy::AcceptAll,
673                        2 => crate::link_manager::ResourceStrategy::AcceptApp,
674                        _ => crate::link_manager::ResourceStrategy::AcceptNone,
675                    };
676                    self.link_manager.register_link_destination(
677                        dest_hash,
678                        sig_prv,
679                        sig_pub_bytes,
680                        strat,
681                    );
682                    // Also register in transport engine so inbound packets are delivered locally
683                    self.engine
684                        .register_destination(dest_hash, rns_core::constants::DESTINATION_SINGLE);
685                    self.local_destinations
686                        .insert(dest_hash, rns_core::constants::DESTINATION_SINGLE);
687                }
688                Event::RegisterRequestHandler {
689                    path,
690                    allowed_list,
691                    handler,
692                } => {
693                    self.link_manager.register_request_handler(
694                        &path,
695                        allowed_list,
696                        move |link_id, p, data, remote| handler(link_id, p, data, remote),
697                    );
698                }
699                Event::RegisterRequestHandlerResponse {
700                    path,
701                    allowed_list,
702                    handler,
703                } => {
704                    self.link_manager.register_request_handler_response(
705                        &path,
706                        allowed_list,
707                        move |link_id, p, data, remote| handler(link_id, p, data, remote),
708                    );
709                }
710                Event::CreateLink {
711                    dest_hash,
712                    dest_sig_pub_bytes,
713                    response_tx,
714                } => {
715                    if self.is_draining() {
716                        self.reject_new_work("create link");
717                        let _ = (dest_hash, dest_sig_pub_bytes);
718                        let _ = response_tx.send([0u8; 16]);
719                        continue;
720                    }
721                    let next_hop_interface = self.engine.next_hop_interface(&dest_hash);
722                    let recalled_route_hint = if next_hop_interface.is_none() {
723                        self.known_destination_route_hint(&dest_hash)
724                    } else {
725                        None
726                    };
727                    if recalled_route_hint.is_some() {
728                        let _ = self.mark_known_destination_used(&dest_hash);
729                    }
730                    let attached_interface =
731                        next_hop_interface.or(recalled_route_hint.map(|(iface, _)| iface));
732                    let hops = self
733                        .engine
734                        .hops_to(&dest_hash)
735                        .or_else(|| recalled_route_hint.map(|(_, hops)| hops))
736                        .unwrap_or(0);
737                    let mtu = attached_interface
738                        .and_then(|iface_id| self.interfaces.get(&iface_id))
739                        .map(|entry| entry.info.mtu)
740                        .unwrap_or(rns_core::constants::MTU as u32);
741                    let (link_id, mut link_actions) = self.link_manager.create_link(
742                        &dest_hash,
743                        &dest_sig_pub_bytes,
744                        hops,
745                        mtu,
746                        &mut self.rng,
747                    );
748                    if let Some(iface) = attached_interface {
749                        self.link_manager.set_link_route_hint(&link_id, iface, None);
750                    }
751                    if next_hop_interface.is_none() {
752                        if let Some(iface) = attached_interface {
753                            for action in &mut link_actions {
754                                if let LinkManagerAction::SendPacket {
755                                    dest_type,
756                                    attached_interface,
757                                    ..
758                                } = action
759                                {
760                                    if *dest_type == rns_core::constants::DESTINATION_LINK
761                                        && attached_interface.is_none()
762                                    {
763                                        *attached_interface = Some(iface);
764                                    }
765                                }
766                            }
767                        }
768                    }
769                    let _ = response_tx.send(link_id);
770                    self.dispatch_link_actions(link_actions);
771                }
772                Event::SendRequest {
773                    link_id,
774                    path,
775                    data,
776                } => {
777                    if self.is_draining() {
778                        self.reject_new_work("send link request");
779                        let _ = (link_id, path, data);
780                        continue;
781                    }
782                    let link_actions =
783                        self.link_manager
784                            .send_request(&link_id, &path, &data, &mut self.rng);
785                    self.dispatch_link_actions(link_actions);
786                }
787                Event::IdentifyOnLink {
788                    link_id,
789                    identity_prv_key,
790                } => {
791                    if self.is_draining() {
792                        self.reject_new_work("identify on link");
793                        let _ = (link_id, identity_prv_key);
794                        continue;
795                    }
796                    let identity =
797                        rns_crypto::identity::Identity::from_private_key(&identity_prv_key);
798                    let link_actions =
799                        self.link_manager
800                            .identify(&link_id, &identity, &mut self.rng);
801                    self.dispatch_link_actions(link_actions);
802                }
803                Event::TeardownLink { link_id } => {
804                    let link_actions = self.link_manager.teardown_link(&link_id);
805                    self.dispatch_link_actions(link_actions);
806                }
807                Event::SendResource {
808                    link_id,
809                    data,
810                    metadata,
811                    auto_compress,
812                } => {
813                    if self.is_draining() {
814                        self.reject_new_work("send resource");
815                        let _ = (link_id, data, metadata, auto_compress);
816                        continue;
817                    }
818                    let link_actions = self.link_manager.send_resource_with_auto_compress(
819                        &link_id,
820                        &data,
821                        metadata.as_deref(),
822                        auto_compress,
823                        &mut self.rng,
824                    );
825                    self.dispatch_link_actions(link_actions);
826                }
827                Event::SetResourceStrategy { link_id, strategy } => {
828                    use crate::link_manager::ResourceStrategy;
829                    let strat = match strategy {
830                        0 => ResourceStrategy::AcceptNone,
831                        1 => ResourceStrategy::AcceptAll,
832                        2 => ResourceStrategy::AcceptApp,
833                        _ => ResourceStrategy::AcceptNone,
834                    };
835                    self.link_manager.set_resource_strategy(&link_id, strat);
836                }
837                Event::AcceptResource {
838                    link_id,
839                    resource_hash,
840                    accept,
841                } => {
842                    if self.is_draining() && accept {
843                        self.reject_new_work("accept resource");
844                        let _ = (link_id, resource_hash, accept);
845                        continue;
846                    }
847                    let link_actions = self.link_manager.accept_resource(
848                        &link_id,
849                        &resource_hash,
850                        accept,
851                        &mut self.rng,
852                    );
853                    self.dispatch_link_actions(link_actions);
854                }
855                Event::SendChannelMessage {
856                    link_id,
857                    msgtype,
858                    payload,
859                    response_tx,
860                } => {
861                    if self.is_draining() {
862                        self.reject_new_work("send channel message");
863                        let _ = response_tx.send(Err(self.drain_error("send channel message")));
864                        continue;
865                    }
866                    match self.link_manager.send_channel_message(
867                        &link_id,
868                        msgtype,
869                        &payload,
870                        &mut self.rng,
871                    ) {
872                        Ok(link_actions) => {
873                            self.dispatch_link_actions(link_actions);
874                            let _ = response_tx.send(Ok(()));
875                        }
876                        Err(err) => {
877                            let _ = response_tx.send(Err(err));
878                        }
879                    }
880                }
881                Event::SendOnLink {
882                    link_id,
883                    data,
884                    context,
885                } => {
886                    if self.is_draining() {
887                        self.reject_new_work("send link payload");
888                        let _ = (link_id, data, context);
889                        continue;
890                    }
891                    let link_actions =
892                        self.link_manager
893                            .send_on_link(&link_id, &data, context, &mut self.rng);
894                    self.dispatch_link_actions(link_actions);
895                }
896                Event::RequestPath { dest_hash } => {
897                    if self.is_draining() {
898                        self.reject_new_work("request path");
899                        let _ = dest_hash;
900                        continue;
901                    }
902                    self.handle_request_path(dest_hash);
903                }
904                Event::RegisterProofStrategy {
905                    dest_hash,
906                    strategy,
907                    signing_key,
908                } => {
909                    let identity = signing_key
910                        .map(|key| rns_crypto::identity::Identity::from_private_key(&key));
911                    self.proof_strategies
912                        .insert(dest_hash, (strategy, identity));
913                }
914                Event::ProposeDirectConnect { link_id } => {
915                    if self.is_draining() {
916                        self.reject_new_work("propose direct connect");
917                        let _ = link_id;
918                        continue;
919                    }
920                    let derived_key = self.link_manager.get_derived_key(&link_id);
921                    if let Some(dk) = derived_key {
922                        let tx = self.get_event_sender();
923                        let hp_actions =
924                            self.holepunch_manager
925                                .propose(link_id, &dk, &mut self.rng, &tx);
926                        self.dispatch_holepunch_actions(hp_actions);
927                    } else {
928                        log::warn!(
929                            "Cannot propose direct connect: no derived key for link {:02x?}",
930                            &link_id[..4]
931                        );
932                    }
933                }
934                Event::SetDirectConnectPolicy { policy } => {
935                    self.holepunch_manager.set_policy(policy);
936                }
937                Event::HolePunchProbeResult {
938                    link_id,
939                    session_id,
940                    observed_addr,
941                    socket,
942                    probe_server,
943                } => {
944                    let hp_actions = self.holepunch_manager.handle_probe_result(
945                        link_id,
946                        session_id,
947                        observed_addr,
948                        socket,
949                        probe_server,
950                    );
951                    self.dispatch_holepunch_actions(hp_actions);
952                }
953                Event::HolePunchProbeFailed {
954                    link_id,
955                    session_id,
956                } => {
957                    let hp_actions = self
958                        .holepunch_manager
959                        .handle_probe_failed(link_id, session_id);
960                    self.dispatch_holepunch_actions(hp_actions);
961                }
962                Event::LoadHook {
963                    name,
964                    wasm_bytes,
965                    attach_point,
966                    priority,
967                    response_tx,
968                } => {
969                    #[cfg(feature = "hooks")]
970                    {
971                        let result = (|| -> Result<(), String> {
972                            let point_idx = crate::config::parse_hook_point(&attach_point)
973                                .ok_or_else(|| format!("unknown hook point '{}'", attach_point))?;
974                            let mgr = self
975                                .hook_manager
976                                .as_ref()
977                                .ok_or_else(|| "hook manager not available".to_string())?;
978                            let program = mgr
979                                .compile(name.clone(), &wasm_bytes, priority)
980                                .map_err(|e| format!("compile error: {}", e))?;
981                            self.hook_slots[point_idx].attach(program);
982                            log::info!(
983                                "Loaded hook '{}' at point {} (priority {})",
984                                name,
985                                attach_point,
986                                priority
987                            );
988                            Ok(())
989                        })();
990                        let _ = response_tx.send(result);
991                    }
992                    #[cfg(not(feature = "hooks"))]
993                    {
994                        let _ = (name, wasm_bytes, attach_point, priority);
995                        let _ = response_tx.send(Err("hooks not enabled".to_string()));
996                    }
997                }
998                Event::LoadHookFile {
999                    name,
1000                    path,
1001                    hook_type,
1002                    attach_point,
1003                    priority,
1004                    response_tx,
1005                } => {
1006                    #[cfg(feature = "hooks")]
1007                    {
1008                        let result = (|| -> Result<(), String> {
1009                            let point_idx = crate::config::parse_hook_point(&attach_point)
1010                                .ok_or_else(|| format!("unknown hook point '{}'", attach_point))?;
1011                            let backend = crate::config::parse_hook_backend(&hook_type)?;
1012                            let mgr = self
1013                                .hook_manager
1014                                .as_ref()
1015                                .ok_or_else(|| "hook manager not available".to_string())?;
1016                            let program = mgr
1017                                .load_file_backend(
1018                                    name.clone(),
1019                                    std::path::Path::new(&path),
1020                                    priority,
1021                                    backend,
1022                                )
1023                                .map_err(|e| format!("load error: {}", e))?;
1024                            self.hook_slots[point_idx].attach(program);
1025                            log::info!(
1026                                "Loaded {} hook '{}' at point {} (priority {})",
1027                                backend.as_str(),
1028                                name,
1029                                attach_point,
1030                                priority
1031                            );
1032                            Ok(())
1033                        })();
1034                        let _ = response_tx.send(result);
1035                    }
1036                    #[cfg(not(feature = "hooks"))]
1037                    {
1038                        let _ = (name, path, hook_type, attach_point, priority);
1039                        let _ = response_tx.send(Err("hooks not enabled".to_string()));
1040                    }
1041                }
1042                Event::LoadBuiltinHook {
1043                    name,
1044                    builtin_id,
1045                    attach_point,
1046                    priority,
1047                    response_tx,
1048                } => {
1049                    #[cfg(feature = "hooks")]
1050                    {
1051                        let result = (|| -> Result<(), String> {
1052                            let point_idx = crate::config::parse_hook_point(&attach_point)
1053                                .ok_or_else(|| format!("unknown hook point '{}'", attach_point))?;
1054                            let mgr = self
1055                                .hook_manager
1056                                .as_ref()
1057                                .ok_or_else(|| "hook manager not available".to_string())?;
1058                            let program = mgr
1059                                .load_builtin(name.clone(), builtin_id.as_str(), priority)
1060                                .map_err(|e| format!("load error: {}", e))?;
1061                            self.hook_slots[point_idx].attach(program);
1062                            log::info!(
1063                                "Loaded built-in hook '{}' ({}) at point {} (priority {})",
1064                                name,
1065                                builtin_id,
1066                                attach_point,
1067                                priority
1068                            );
1069                            Ok(())
1070                        })();
1071                        let _ = response_tx.send(result);
1072                    }
1073                    #[cfg(not(feature = "hooks"))]
1074                    {
1075                        let _ = (name, builtin_id, attach_point, priority);
1076                        let _ = response_tx.send(Err("hooks not enabled".to_string()));
1077                    }
1078                }
1079                Event::UnloadHook {
1080                    name,
1081                    attach_point,
1082                    response_tx,
1083                } => {
1084                    #[cfg(feature = "hooks")]
1085                    {
1086                        let result = (|| -> Result<(), String> {
1087                            let point_idx = crate::config::parse_hook_point(&attach_point)
1088                                .ok_or_else(|| format!("unknown hook point '{}'", attach_point))?;
1089                            match self.hook_slots[point_idx].detach(&name) {
1090                                Some(_) => {
1091                                    log::info!(
1092                                        "Unloaded hook '{}' from point {}",
1093                                        name,
1094                                        attach_point
1095                                    );
1096                                    Ok(())
1097                                }
1098                                None => Err(format!(
1099                                    "hook '{}' not found at point '{}'",
1100                                    name, attach_point
1101                                )),
1102                            }
1103                        })();
1104                        let _ = response_tx.send(result);
1105                    }
1106                    #[cfg(not(feature = "hooks"))]
1107                    {
1108                        let _ = (name, attach_point);
1109                        let _ = response_tx.send(Err("hooks not enabled".to_string()));
1110                    }
1111                }
1112                Event::ReloadHook {
1113                    name,
1114                    attach_point,
1115                    wasm_bytes,
1116                    response_tx,
1117                } => {
1118                    #[cfg(feature = "hooks")]
1119                    {
1120                        let result = (|| -> Result<(), String> {
1121                            let point_idx = crate::config::parse_hook_point(&attach_point)
1122                                .ok_or_else(|| format!("unknown hook point '{}'", attach_point))?;
1123                            let old =
1124                                self.hook_slots[point_idx].detach(&name).ok_or_else(|| {
1125                                    format!("hook '{}' not found at point '{}'", name, attach_point)
1126                                })?;
1127                            let priority = old.priority;
1128                            let mgr = match self.hook_manager.as_ref() {
1129                                Some(m) => m,
1130                                None => {
1131                                    self.hook_slots[point_idx].attach(old);
1132                                    return Err("hook manager not available".to_string());
1133                                }
1134                            };
1135                            match mgr.compile(name.clone(), &wasm_bytes, priority) {
1136                                Ok(program) => {
1137                                    self.hook_slots[point_idx].attach(program);
1138                                    log::info!(
1139                                        "Reloaded hook '{}' at point {} (priority {})",
1140                                        name,
1141                                        attach_point,
1142                                        priority
1143                                    );
1144                                    Ok(())
1145                                }
1146                                Err(e) => {
1147                                    self.hook_slots[point_idx].attach(old);
1148                                    Err(format!("compile error: {}", e))
1149                                }
1150                            }
1151                        })();
1152                        let _ = response_tx.send(result);
1153                    }
1154                    #[cfg(not(feature = "hooks"))]
1155                    {
1156                        let _ = (name, attach_point, wasm_bytes);
1157                        let _ = response_tx.send(Err("hooks not enabled".to_string()));
1158                    }
1159                }
1160                Event::ReloadHookFile {
1161                    name,
1162                    attach_point,
1163                    path,
1164                    hook_type,
1165                    response_tx,
1166                } => {
1167                    #[cfg(feature = "hooks")]
1168                    {
1169                        let result = (|| -> Result<(), String> {
1170                            let point_idx = crate::config::parse_hook_point(&attach_point)
1171                                .ok_or_else(|| format!("unknown hook point '{}'", attach_point))?;
1172                            let old =
1173                                self.hook_slots[point_idx].detach(&name).ok_or_else(|| {
1174                                    format!("hook '{}' not found at point '{}'", name, attach_point)
1175                                })?;
1176                            let priority = old.priority;
1177                            let backend = match crate::config::parse_hook_backend(&hook_type) {
1178                                Ok(backend) => backend,
1179                                Err(e) => {
1180                                    self.hook_slots[point_idx].attach(old);
1181                                    return Err(e);
1182                                }
1183                            };
1184                            let mgr = match self.hook_manager.as_ref() {
1185                                Some(m) => m,
1186                                None => {
1187                                    self.hook_slots[point_idx].attach(old);
1188                                    return Err("hook manager not available".to_string());
1189                                }
1190                            };
1191                            match mgr.load_file_backend(
1192                                name.clone(),
1193                                std::path::Path::new(&path),
1194                                priority,
1195                                backend,
1196                            ) {
1197                                Ok(program) => {
1198                                    self.hook_slots[point_idx].attach(program);
1199                                    log::info!(
1200                                        "Reloaded {} hook '{}' at point {} (priority {})",
1201                                        backend.as_str(),
1202                                        name,
1203                                        attach_point,
1204                                        priority
1205                                    );
1206                                    Ok(())
1207                                }
1208                                Err(e) => {
1209                                    self.hook_slots[point_idx].attach(old);
1210                                    Err(format!("load error: {}", e))
1211                                }
1212                            }
1213                        })();
1214                        let _ = response_tx.send(result);
1215                    }
1216                    #[cfg(not(feature = "hooks"))]
1217                    {
1218                        let _ = (name, attach_point, path, hook_type);
1219                        let _ = response_tx.send(Err("hooks not enabled".to_string()));
1220                    }
1221                }
1222                Event::ReloadBuiltinHook {
1223                    name,
1224                    attach_point,
1225                    builtin_id,
1226                    response_tx,
1227                } => {
1228                    #[cfg(feature = "hooks")]
1229                    {
1230                        let result = (|| -> Result<(), String> {
1231                            let point_idx = crate::config::parse_hook_point(&attach_point)
1232                                .ok_or_else(|| format!("unknown hook point '{}'", attach_point))?;
1233                            let old =
1234                                self.hook_slots[point_idx].detach(&name).ok_or_else(|| {
1235                                    format!("hook '{}' not found at point '{}'", name, attach_point)
1236                                })?;
1237                            let priority = old.priority;
1238                            let mgr = match self.hook_manager.as_ref() {
1239                                Some(m) => m,
1240                                None => {
1241                                    self.hook_slots[point_idx].attach(old);
1242                                    return Err("hook manager not available".to_string());
1243                                }
1244                            };
1245                            match mgr.load_builtin(name.clone(), builtin_id.as_str(), priority) {
1246                                Ok(program) => {
1247                                    self.hook_slots[point_idx].attach(program);
1248                                    log::info!(
1249                                        "Reloaded built-in hook '{}' ({}) at point {} (priority {})",
1250                                        name,
1251                                        builtin_id,
1252                                        attach_point,
1253                                        priority
1254                                    );
1255                                    Ok(())
1256                                }
1257                                Err(e) => {
1258                                    self.hook_slots[point_idx].attach(old);
1259                                    Err(format!("load error: {}", e))
1260                                }
1261                            }
1262                        })();
1263                        let _ = response_tx.send(result);
1264                    }
1265                    #[cfg(not(feature = "hooks"))]
1266                    {
1267                        let _ = (name, attach_point, builtin_id);
1268                        let _ = response_tx.send(Err("hooks not enabled".to_string()));
1269                    }
1270                }
1271                Event::SetHookEnabled {
1272                    name,
1273                    attach_point,
1274                    enabled,
1275                    response_tx,
1276                } => {
1277                    #[cfg(feature = "hooks")]
1278                    {
1279                        let result = self.update_hook_program(&name, &attach_point, |program| {
1280                            program.enabled = enabled;
1281                        });
1282                        if result.is_ok() {
1283                            log::info!(
1284                                "{} hook '{}' at point {}",
1285                                if enabled { "Enabled" } else { "Disabled" },
1286                                name,
1287                                attach_point,
1288                            );
1289                        }
1290                        let _ = response_tx.send(result);
1291                    }
1292                    #[cfg(not(feature = "hooks"))]
1293                    {
1294                        let _ = (name, attach_point, enabled);
1295                        let _ = response_tx.send(Err("hooks not enabled".to_string()));
1296                    }
1297                }
1298                Event::SetHookPriority {
1299                    name,
1300                    attach_point,
1301                    priority,
1302                    response_tx,
1303                } => {
1304                    #[cfg(feature = "hooks")]
1305                    {
1306                        let result = self.update_hook_program(&name, &attach_point, |program| {
1307                            program.priority = priority;
1308                        });
1309                        if result.is_ok() {
1310                            if let Some(point_idx) = crate::config::parse_hook_point(&attach_point)
1311                            {
1312                                self.hook_slots[point_idx]
1313                                    .programs
1314                                    .sort_by(|a, b| b.priority.cmp(&a.priority));
1315                                log::info!(
1316                                    "Updated hook '{}' at point {} to priority {}",
1317                                    name,
1318                                    attach_point,
1319                                    priority,
1320                                );
1321                            } else {
1322                                log::error!(
1323                                    "hook point '{}' became invalid during priority update",
1324                                    attach_point
1325                                );
1326                            }
1327                        }
1328                        let _ = response_tx.send(result);
1329                    }
1330                    #[cfg(not(feature = "hooks"))]
1331                    {
1332                        let _ = (name, attach_point, priority);
1333                        let _ = response_tx.send(Err("hooks not enabled".to_string()));
1334                    }
1335                }
1336                Event::ListHooks { response_tx } => {
1337                    #[cfg(feature = "hooks")]
1338                    {
1339                        let hook_point_names = [
1340                            "PreIngress",
1341                            "PreDispatch",
1342                            "AnnounceReceived",
1343                            "PathUpdated",
1344                            "AnnounceRetransmit",
1345                            "LinkRequestReceived",
1346                            "LinkEstablished",
1347                            "LinkClosed",
1348                            "InterfaceUp",
1349                            "InterfaceDown",
1350                            "InterfaceConfigChanged",
1351                            "BackbonePeerConnected",
1352                            "BackbonePeerDisconnected",
1353                            "BackbonePeerIdleTimeout",
1354                            "BackbonePeerWriteStall",
1355                            "BackbonePeerPenalty",
1356                            "SendOnInterface",
1357                            "BroadcastOnAllInterfaces",
1358                            "DeliverLocal",
1359                            "TunnelSynthesize",
1360                            "Tick",
1361                        ];
1362                        let mut infos = Vec::new();
1363                        for (idx, slot) in self.hook_slots.iter().enumerate() {
1364                            let point_name = hook_point_names.get(idx).unwrap_or(&"Unknown");
1365                            for prog in &slot.programs {
1366                                infos.push(crate::event::HookInfo {
1367                                    name: prog.name.clone(),
1368                                    hook_type: prog.backend_name().to_string(),
1369                                    attach_point: point_name.to_string(),
1370                                    priority: prog.priority,
1371                                    enabled: prog.enabled,
1372                                    consecutive_traps: prog.consecutive_traps,
1373                                });
1374                            }
1375                        }
1376                        let _ = response_tx.send(infos);
1377                    }
1378                    #[cfg(not(feature = "hooks"))]
1379                    {
1380                        let _ = response_tx.send(Vec::new());
1381                    }
1382                }
1383                Event::InterfaceConfigChanged(id) => {
1384                    #[cfg(feature = "hooks")]
1385                    {
1386                        let ctx = HookContext::Interface { interface_id: id.0 };
1387                        let now = time::now();
1388                        let engine_ref = EngineRef {
1389                            engine: &self.engine,
1390                            interfaces: &self.interfaces,
1391                            link_manager: &self.link_manager,
1392                            now,
1393                        };
1394                        let provider_events_enabled = self.provider_events_enabled();
1395                        if let Some(ref e) = run_hook_inner(
1396                            &mut self.hook_slots[HookPoint::InterfaceConfigChanged as usize]
1397                                .programs,
1398                            &self.hook_manager,
1399                            &engine_ref,
1400                            &ctx,
1401                            now,
1402                            provider_events_enabled,
1403                        ) {
1404                            self.forward_hook_side_effects("InterfaceConfigChanged", e);
1405                        }
1406                    }
1407                    #[cfg(not(feature = "hooks"))]
1408                    let _ = id;
1409                }
1410                Event::BackbonePeerConnected {
1411                    server_interface_id,
1412                    peer_interface_id,
1413                    peer_ip,
1414                    peer_port,
1415                } => {
1416                    #[cfg(feature = "hooks")]
1417                    {
1418                        self.run_backbone_peer_hook(
1419                            "BackbonePeerConnected",
1420                            HookPoint::BackbonePeerConnected,
1421                            &BackbonePeerHookEvent {
1422                                server_interface_id,
1423                                peer_interface_id: Some(peer_interface_id),
1424                                peer_ip,
1425                                peer_port,
1426                                connected_for: Duration::ZERO,
1427                                had_received_data: false,
1428                                penalty_level: 0,
1429                                blacklist_for: Duration::ZERO,
1430                            },
1431                        );
1432                    }
1433                    #[cfg(not(feature = "hooks"))]
1434                    let _ = (server_interface_id, peer_interface_id, peer_ip, peer_port);
1435                }
1436                Event::BackbonePeerDisconnected {
1437                    server_interface_id,
1438                    peer_interface_id,
1439                    peer_ip,
1440                    peer_port,
1441                    connected_for,
1442                    had_received_data,
1443                } => {
1444                    #[cfg(feature = "hooks")]
1445                    {
1446                        self.run_backbone_peer_hook(
1447                            "BackbonePeerDisconnected",
1448                            HookPoint::BackbonePeerDisconnected,
1449                            &BackbonePeerHookEvent {
1450                                server_interface_id,
1451                                peer_interface_id: Some(peer_interface_id),
1452                                peer_ip,
1453                                peer_port,
1454                                connected_for,
1455                                had_received_data,
1456                                penalty_level: 0,
1457                                blacklist_for: Duration::ZERO,
1458                            },
1459                        );
1460                    }
1461                    #[cfg(not(feature = "hooks"))]
1462                    let _ = (
1463                        server_interface_id,
1464                        peer_interface_id,
1465                        peer_ip,
1466                        peer_port,
1467                        connected_for,
1468                        had_received_data,
1469                    );
1470                }
1471                Event::BackbonePeerIdleTimeout {
1472                    server_interface_id,
1473                    peer_interface_id,
1474                    peer_ip,
1475                    peer_port,
1476                    connected_for,
1477                } => {
1478                    #[cfg(feature = "hooks")]
1479                    {
1480                        self.run_backbone_peer_hook(
1481                            "BackbonePeerIdleTimeout",
1482                            HookPoint::BackbonePeerIdleTimeout,
1483                            &BackbonePeerHookEvent {
1484                                server_interface_id,
1485                                peer_interface_id: Some(peer_interface_id),
1486                                peer_ip,
1487                                peer_port,
1488                                connected_for,
1489                                had_received_data: false,
1490                                penalty_level: 0,
1491                                blacklist_for: Duration::ZERO,
1492                            },
1493                        );
1494                    }
1495                    #[cfg(not(feature = "hooks"))]
1496                    let _ = (
1497                        server_interface_id,
1498                        peer_interface_id,
1499                        peer_ip,
1500                        peer_port,
1501                        connected_for,
1502                    );
1503                }
1504                Event::BackbonePeerWriteStall {
1505                    server_interface_id,
1506                    peer_interface_id,
1507                    peer_ip,
1508                    peer_port,
1509                    connected_for,
1510                } => {
1511                    #[cfg(feature = "hooks")]
1512                    {
1513                        self.run_backbone_peer_hook(
1514                            "BackbonePeerWriteStall",
1515                            HookPoint::BackbonePeerWriteStall,
1516                            &BackbonePeerHookEvent {
1517                                server_interface_id,
1518                                peer_interface_id: Some(peer_interface_id),
1519                                peer_ip,
1520                                peer_port,
1521                                connected_for,
1522                                had_received_data: false,
1523                                penalty_level: 0,
1524                                blacklist_for: Duration::ZERO,
1525                            },
1526                        );
1527                    }
1528                    #[cfg(not(feature = "hooks"))]
1529                    let _ = (
1530                        server_interface_id,
1531                        peer_interface_id,
1532                        peer_ip,
1533                        peer_port,
1534                        connected_for,
1535                    );
1536                }
1537                Event::BackbonePeerPenalty {
1538                    server_interface_id,
1539                    peer_ip,
1540                    penalty_level,
1541                    blacklist_for,
1542                } => {
1543                    #[cfg(feature = "hooks")]
1544                    {
1545                        self.run_backbone_peer_hook(
1546                            "BackbonePeerPenalty",
1547                            HookPoint::BackbonePeerPenalty,
1548                            &BackbonePeerHookEvent {
1549                                server_interface_id,
1550                                peer_interface_id: None,
1551                                peer_ip,
1552                                peer_port: 0,
1553                                connected_for: Duration::ZERO,
1554                                had_received_data: false,
1555                                penalty_level,
1556                                blacklist_for,
1557                            },
1558                        );
1559                    }
1560                    #[cfg(not(feature = "hooks"))]
1561                    let _ = (server_interface_id, peer_ip, penalty_level, blacklist_for);
1562                }
1563                Event::Shutdown => {
1564                    self.graceful_shutdown();
1565                    break;
1566                }
1567            }
1568        }
1569    }
1570    pub(crate) fn handle_tunnel_synth_delivery(&mut self, raw: &[u8]) {
1571        // Extract the data payload from the raw packet
1572        let packet = match RawPacket::unpack(raw) {
1573            Ok(p) => p,
1574            Err(_) => return,
1575        };
1576
1577        match rns_core::transport::tunnel::validate_tunnel_synthesize_data(&packet.data) {
1578            Ok(validated) => {
1579                // Find the interface this tunnel belongs to by computing the expected
1580                // tunnel_id for each interface with wants_tunnel
1581                let iface_id = self
1582                    .interfaces
1583                    .iter()
1584                    .find(|(_, entry)| entry.info.wants_tunnel && entry.online && entry.enabled)
1585                    .map(|(id, _)| *id);
1586
1587                if let Some(iface) = iface_id {
1588                    let now = time::now();
1589                    let tunnel_actions = self.engine.handle_tunnel(validated.tunnel_id, iface, now);
1590                    self.dispatch_all(tunnel_actions);
1591                }
1592            }
1593            Err(e) => {
1594                log::debug!("Tunnel synthesis validation failed: {}", e);
1595            }
1596        }
1597    }
1598
1599    /// Synthesize a tunnel on an interface that wants it.
1600    ///
1601    /// Called when an interface with `wants_tunnel` comes up.
1602    pub(crate) fn synthesize_tunnel_for_interface(&mut self, interface: InterfaceId) {
1603        if let Some(ref identity) = self.transport_identity {
1604            let actions = self
1605                .engine
1606                .synthesize_tunnel(identity, interface, &mut self.rng);
1607            self.dispatch_all(actions);
1608        }
1609    }
1610
1611    /// Build and send a path request packet for a destination.
1612    pub(crate) fn handle_request_path(&mut self, dest_hash: [u8; 16]) {
1613        // Build path request data: dest_hash(16) || [transport_id(16)] || random_tag(16)
1614        let mut data = Vec::with_capacity(48);
1615        data.extend_from_slice(&dest_hash);
1616
1617        if self.engine.transport_enabled() {
1618            if let Some(id_hash) = self.engine.identity_hash() {
1619                data.extend_from_slice(id_hash);
1620            }
1621        }
1622
1623        // Random tag (16 bytes)
1624        let mut tag = [0u8; 16];
1625        self.rng.fill_bytes(&mut tag);
1626        data.extend_from_slice(&tag);
1627
1628        // Build as BROADCAST DATA PLAIN packet to rnstransport.path.request
1629        let flags = rns_core::packet::PacketFlags {
1630            header_type: rns_core::constants::HEADER_1,
1631            context_flag: rns_core::constants::FLAG_UNSET,
1632            transport_type: rns_core::constants::TRANSPORT_BROADCAST,
1633            destination_type: rns_core::constants::DESTINATION_PLAIN,
1634            packet_type: rns_core::constants::PACKET_TYPE_DATA,
1635        };
1636
1637        if let Ok(packet) = RawPacket::pack(
1638            flags,
1639            0,
1640            &self.path_request_dest,
1641            None,
1642            rns_core::constants::CONTEXT_NONE,
1643            &data,
1644        ) {
1645            let actions = self.engine.handle_outbound(
1646                &packet,
1647                rns_core::constants::DESTINATION_PLAIN,
1648                None,
1649                time::now(),
1650            );
1651            self.dispatch_all(actions);
1652        }
1653    }
1654
1655    /// Check if we should generate a proof for a delivered packet,
1656    /// and if so, sign and send it.
1657    pub(crate) fn get_event_sender(&self) -> crate::event::EventSender {
1658        // The driver doesn't directly have a sender, but node.rs creates the channel
1659        // and passes rx to the driver. We need to store a sender clone.
1660        // For now we use an internal sender that was set during construction.
1661        self.event_tx.clone()
1662    }
1663
1664    /// Delay before first management announce after startup.
1665    const MANAGEMENT_ANNOUNCE_DELAY: f64 = 5.0;
1666
1667    /// Tick the discovery announcer: start stamp generation if due, send announce if ready.
1668    pub(crate) fn tick_discovery_announcer(&mut self, now: f64) {
1669        let announcer = match self.interface_announcer.as_mut() {
1670            Some(a) => a,
1671            None => return,
1672        };
1673
1674        announcer.maybe_start(now);
1675
1676        let stamp_result = match announcer.poll_ready() {
1677            Some(r) => r,
1678            None => return,
1679        };
1680
1681        if !announcer.contains_interface(&stamp_result.interface_name) {
1682            log::debug!(
1683                "Discovery: dropping completed stamp for removed interface '{}'",
1684                stamp_result.interface_name
1685            );
1686            return;
1687        }
1688
1689        let identity = match self.transport_identity.as_ref() {
1690            Some(id) => id,
1691            None => {
1692                log::warn!("Discovery: stamp ready but no transport identity");
1693                return;
1694            }
1695        };
1696
1697        // Discovery is a SINGLE destination — the dest hash includes the transport identity
1698        let identity_hash = identity.hash();
1699        let disc_dest = rns_core::destination::destination_hash(
1700            crate::discovery::APP_NAME,
1701            &["discovery", "interface"],
1702            Some(&identity_hash),
1703        );
1704        let name_hash = self.discovery_name_hash;
1705        let mut random_hash = [0u8; 10];
1706        self.rng.fill_bytes(&mut random_hash);
1707
1708        let (announce_data, _) = match rns_core::announce::AnnounceData::pack(
1709            identity,
1710            &disc_dest,
1711            &name_hash,
1712            &random_hash,
1713            None,
1714            Some(&stamp_result.app_data),
1715        ) {
1716            Ok(v) => v,
1717            Err(e) => {
1718                log::warn!("Discovery: failed to pack announce: {}", e);
1719                return;
1720            }
1721        };
1722
1723        let flags = rns_core::packet::PacketFlags {
1724            header_type: rns_core::constants::HEADER_1,
1725            context_flag: rns_core::constants::FLAG_UNSET,
1726            transport_type: rns_core::constants::TRANSPORT_BROADCAST,
1727            destination_type: rns_core::constants::DESTINATION_SINGLE,
1728            packet_type: rns_core::constants::PACKET_TYPE_ANNOUNCE,
1729        };
1730
1731        let packet = match RawPacket::pack(
1732            flags,
1733            0,
1734            &disc_dest,
1735            None,
1736            rns_core::constants::CONTEXT_NONE,
1737            &announce_data,
1738        ) {
1739            Ok(p) => p,
1740            Err(e) => {
1741                log::warn!("Discovery: failed to pack packet: {}", e);
1742                return;
1743            }
1744        };
1745
1746        let outbound_actions = self.engine.handle_outbound(
1747            &packet,
1748            rns_core::constants::DESTINATION_SINGLE,
1749            None,
1750            now,
1751        );
1752        log::debug!(
1753            "Discovery announce sent for interface '{}' ({} actions, dest={:02x?})",
1754            stamp_result.interface_name,
1755            outbound_actions.len(),
1756            &disc_dest[..4],
1757        );
1758        self.dispatch_all(outbound_actions);
1759    }
1760
1761    /// Read RSS from /proc/self/statm (Linux only).
1762    pub(crate) fn rss_mb() -> Option<f64> {
1763        let statm = std::fs::read_to_string("/proc/self/statm").ok()?;
1764        let rss_pages: u64 = statm.split_whitespace().nth(1)?.parse().ok()?;
1765        Some(rss_pages as f64 * 4096.0 / (1024.0 * 1024.0))
1766    }
1767
1768    pub(crate) fn parse_proc_kib(contents: &str, key: &str) -> Option<u64> {
1769        contents.lines().find_map(|line| {
1770            let value = line.strip_prefix(key)?;
1771            value.split_whitespace().next()?.parse().ok()
1772        })
1773    }
1774
1775    pub(crate) fn proc_status_mb() -> Option<(f64, f64, f64, f64)> {
1776        let status = std::fs::read_to_string("/proc/self/status").ok()?;
1777        let vm_rss = Self::parse_proc_kib(&status, "VmRSS:")? as f64 / 1024.0;
1778        let vm_hwm = Self::parse_proc_kib(&status, "VmHWM:")? as f64 / 1024.0;
1779        let vm_data = Self::parse_proc_kib(&status, "VmData:")? as f64 / 1024.0;
1780        let vm_swap = Self::parse_proc_kib(&status, "VmSwap:").unwrap_or(0) as f64 / 1024.0;
1781        Some((vm_rss, vm_hwm, vm_data, vm_swap))
1782    }
1783
1784    pub(crate) fn smaps_rollup_mb() -> Option<(f64, f64, f64, f64, f64, f64, f64, f64)> {
1785        let smaps = std::fs::read_to_string("/proc/self/smaps_rollup").ok()?;
1786        let rss_kib = Self::parse_proc_kib(&smaps, "Rss:")?;
1787        let anon_kib = Self::parse_proc_kib(&smaps, "Anonymous:")?;
1788        let shared_clean_kib = Self::parse_proc_kib(&smaps, "Shared_Clean:").unwrap_or(0);
1789        let shared_dirty_kib = Self::parse_proc_kib(&smaps, "Shared_Dirty:").unwrap_or(0);
1790        let private_clean_kib = Self::parse_proc_kib(&smaps, "Private_Clean:").unwrap_or(0);
1791        let private_dirty_kib = Self::parse_proc_kib(&smaps, "Private_Dirty:").unwrap_or(0);
1792        let swap_kib = Self::parse_proc_kib(&smaps, "Swap:").unwrap_or(0);
1793        let file_est_kib = rss_kib.saturating_sub(anon_kib);
1794        Some((
1795            rss_kib as f64 / 1024.0,
1796            anon_kib as f64 / 1024.0,
1797            file_est_kib as f64 / 1024.0,
1798            shared_clean_kib as f64 / 1024.0,
1799            shared_dirty_kib as f64 / 1024.0,
1800            private_clean_kib as f64 / 1024.0,
1801            private_dirty_kib as f64 / 1024.0,
1802            swap_kib as f64 / 1024.0,
1803        ))
1804    }
1805
1806    /// Log sizes of all major collections for memory growth diagnostics.
1807    pub(crate) fn log_memory_stats(&self) {
1808        let rss = Self::rss_mb()
1809            .map(|v| format!("{:.1}", v))
1810            .unwrap_or_else(|| "N/A".into());
1811        let (vm_rss, vm_hwm, vm_data, vm_swap) = Self::proc_status_mb()
1812            .map(|(rss, hwm, data, swap)| {
1813                (
1814                    format!("{rss:.1}"),
1815                    format!("{hwm:.1}"),
1816                    format!("{data:.1}"),
1817                    format!("{swap:.1}"),
1818                )
1819            })
1820            .unwrap_or_else(|| ("N/A".into(), "N/A".into(), "N/A".into(), "N/A".into()));
1821        let (
1822            smaps_rss,
1823            smaps_anon,
1824            smaps_file_est,
1825            smaps_shared_clean,
1826            smaps_shared_dirty,
1827            smaps_private_clean,
1828            smaps_private_dirty,
1829            smaps_swap,
1830        ) = Self::smaps_rollup_mb()
1831            .map(
1832                |(
1833                    rss,
1834                    anon,
1835                    file_est,
1836                    shared_clean,
1837                    shared_dirty,
1838                    private_clean,
1839                    private_dirty,
1840                    swap,
1841                )| {
1842                    (
1843                        format!("{rss:.1}"),
1844                        format!("{anon:.1}"),
1845                        format!("{file_est:.1}"),
1846                        format!("{shared_clean:.1}"),
1847                        format!("{shared_dirty:.1}"),
1848                        format!("{private_clean:.1}"),
1849                        format!("{private_dirty:.1}"),
1850                        format!("{swap:.1}"),
1851                    )
1852                },
1853            )
1854            .unwrap_or_else(|| {
1855                (
1856                    "N/A".into(),
1857                    "N/A".into(),
1858                    "N/A".into(),
1859                    "N/A".into(),
1860                    "N/A".into(),
1861                    "N/A".into(),
1862                    "N/A".into(),
1863                    "N/A".into(),
1864                )
1865            });
1866        log::info!(
1867            "MEMSTATS rss_mb={} vmrss_mb={} vmhwm_mb={} vmdata_mb={} vmswap_mb={} smaps_rss_mb={} smaps_anon_mb={} smaps_file_est_mb={} smaps_shared_clean_mb={} smaps_shared_dirty_mb={} smaps_private_clean_mb={} smaps_private_dirty_mb={} smaps_swap_mb={} known_dest={} known_dest_cap_evict={} path={} path_cap_evict={} announce={} reverse={}              link={} held_ann={} hashlist={} sig_cache={} ann_verify_q={} rate_lim={} blackhole={} tunnel={} ann_q_ifaces={} ann_q_nonempty={} ann_q_entries={} ann_q_bytes={} ann_q_iface_drop={}              pr_tags={} disc_pr={} sent_pkt={} completed={} local_dest={}              shared_ann={} lm_links={} hp_sessions={} proof_strat={}",
1868            rss,
1869            vm_rss,
1870            vm_hwm,
1871            vm_data,
1872            vm_swap,
1873            smaps_rss,
1874            smaps_anon,
1875            smaps_file_est,
1876            smaps_shared_clean,
1877            smaps_shared_dirty,
1878            smaps_private_clean,
1879            smaps_private_dirty,
1880            smaps_swap,
1881            self.known_destinations.len(),
1882            self.known_destinations_cap_evict_count,
1883            self.engine.path_table_count(),
1884            self.engine.path_destination_cap_evict_count(),
1885            self.engine.announce_table_count(),
1886            self.engine.reverse_table_count(),
1887            self.engine.link_table_count(),
1888            self.engine.held_announces_count(),
1889            self.engine.packet_hashlist_len(),
1890            self.engine.announce_sig_cache_len(),
1891            self.announce_verify_queue
1892                .lock()
1893                .map(|queue| queue.len())
1894                .unwrap_or(0),
1895            self.engine.rate_limiter_count(),
1896            self.engine.blackholed_count(),
1897            self.engine.tunnel_count(),
1898            self.engine.announce_queue_count(),
1899            self.engine.nonempty_announce_queue_count(),
1900            self.engine.queued_announce_count(),
1901            self.engine.queued_announce_bytes(),
1902            self.engine.announce_queue_interface_cap_drop_count(),
1903            self.engine.discovery_pr_tags_count(),
1904            self.engine.discovery_path_requests_count(),
1905            self.sent_packets.len(),
1906            self.completed_proofs.len(),
1907            self.local_destinations.len(),
1908            self.shared_announces.len(),
1909            self.link_manager.link_count(),
1910            self.holepunch_manager.session_count(),
1911            self.proof_strategies.len(),
1912        );
1913    }
1914
1915    /// Emit management and/or blackhole announces if enabled and due.
1916    pub(crate) fn tick_management_announces(&mut self, now: f64) {
1917        if self.transport_identity.is_none() {
1918            return;
1919        }
1920
1921        let uptime = now - self.started;
1922
1923        // Wait for initial delay
1924        if !self.initial_announce_sent {
1925            if uptime < Self::MANAGEMENT_ANNOUNCE_DELAY {
1926                return;
1927            }
1928            self.initial_announce_sent = true;
1929            self.emit_management_announces(now);
1930            return;
1931        }
1932
1933        // Periodic re-announce
1934        if now - self.last_management_announce >= self.management_announce_interval_secs {
1935            self.emit_management_announces(now);
1936        }
1937    }
1938
1939    /// Emit management/blackhole announce packets through the engine outbound path.
1940    pub(crate) fn emit_management_announces(&mut self, now: f64) {
1941        use crate::management;
1942
1943        self.last_management_announce = now;
1944
1945        let identity = match self.transport_identity {
1946            Some(ref id) => id,
1947            None => return,
1948        };
1949
1950        // Build announce packets first (immutable borrow of identity), then dispatch
1951        let mgmt_raw = if self.management_config.enable_remote_management {
1952            management::build_management_announce(identity, &mut self.rng)
1953        } else {
1954            None
1955        };
1956
1957        let bh_raw = if self.management_config.publish_blackhole {
1958            management::build_blackhole_announce(identity, &mut self.rng)
1959        } else {
1960            None
1961        };
1962
1963        let probe_raw = if self.probe_responder_hash.is_some() {
1964            management::build_probe_announce(identity, &mut self.rng)
1965        } else {
1966            None
1967        };
1968
1969        if let Some(raw) = mgmt_raw {
1970            if let Ok(packet) = RawPacket::unpack(&raw) {
1971                let actions = self.engine.handle_outbound(
1972                    &packet,
1973                    rns_core::constants::DESTINATION_SINGLE,
1974                    None,
1975                    now,
1976                );
1977                self.dispatch_all(actions);
1978                log::debug!("Emitted management destination announce");
1979            }
1980        }
1981
1982        if let Some(raw) = bh_raw {
1983            if let Ok(packet) = RawPacket::unpack(&raw) {
1984                let actions = self.engine.handle_outbound(
1985                    &packet,
1986                    rns_core::constants::DESTINATION_SINGLE,
1987                    None,
1988                    now,
1989                );
1990                self.dispatch_all(actions);
1991                log::debug!("Emitted blackhole info announce");
1992            }
1993        }
1994
1995        if let Some(raw) = probe_raw {
1996            if let Ok(packet) = RawPacket::unpack(&raw) {
1997                let actions = self.engine.handle_outbound(
1998                    &packet,
1999                    rns_core::constants::DESTINATION_SINGLE,
2000                    None,
2001                    now,
2002                );
2003                self.dispatch_all(actions);
2004                log::debug!("Emitted probe responder announce");
2005            }
2006        }
2007    }
2008
2009    /// Handle a management request by querying engine state and sending a response.
2010    pub(crate) fn handle_management_request(
2011        &mut self,
2012        link_id: [u8; 16],
2013        path_hash: [u8; 16],
2014        data: Vec<u8>,
2015        request_id: [u8; 16],
2016        remote_identity: Option<([u8; 16], [u8; 64])>,
2017    ) {
2018        use crate::management;
2019
2020        // ACL check for /status and /path (ALLOW_LIST), /list is ALLOW_ALL
2021        let is_restricted = path_hash == management::status_path_hash()
2022            || path_hash == management::path_path_hash();
2023
2024        if is_restricted && !self.management_config.remote_management_allowed.is_empty() {
2025            match remote_identity {
2026                Some((identity_hash, _)) => {
2027                    if !self
2028                        .management_config
2029                        .remote_management_allowed
2030                        .contains(&identity_hash)
2031                    {
2032                        log::debug!("Management request denied: identity not in allowed list");
2033                        return;
2034                    }
2035                }
2036                None => {
2037                    log::debug!("Management request denied: peer not identified");
2038                    return;
2039                }
2040            }
2041        }
2042
2043        let response_data = if path_hash == management::status_path_hash() {
2044            {
2045                let views: Vec<&dyn management::InterfaceStatusView> = self
2046                    .interfaces
2047                    .values()
2048                    .map(|e| e as &dyn management::InterfaceStatusView)
2049                    .collect();
2050                management::handle_status_request(
2051                    &data,
2052                    &self.engine,
2053                    &views,
2054                    self.started,
2055                    self.probe_responder_hash,
2056                )
2057            }
2058        } else if path_hash == management::path_path_hash() {
2059            management::handle_path_request(&data, &self.engine)
2060        } else if path_hash == management::list_path_hash() {
2061            management::handle_blackhole_list_request(&self.engine)
2062        } else {
2063            log::warn!("Unknown management path_hash: {:02x?}", &path_hash[..4]);
2064            None
2065        };
2066
2067        if let Some(response) = response_data {
2068            let actions = self.link_manager.send_management_response(
2069                &link_id,
2070                &request_id,
2071                &response,
2072                &mut self.rng,
2073            );
2074            self.dispatch_link_actions(actions);
2075        }
2076    }
2077}