Skip to main content

rns_net/driver/
lifecycle.rs

1use super::*;
2
3impl Driver {
4    pub(crate) fn upsert_known_destination(
5        &mut self,
6        dest_hash: [u8; 16],
7        announced: crate::destination::AnnouncedIdentity,
8    ) {
9        if let Some(existing) = self.known_destinations.get_mut(&dest_hash) {
10            existing.announced = announced;
11            return;
12        }
13
14        self.enforce_known_destination_cap(true);
15        self.known_destinations.insert(
16            dest_hash,
17            KnownDestinationState {
18                announced,
19                was_used: false,
20                last_used_at: None,
21                retained: false,
22            },
23        );
24    }
25
26    pub(crate) fn known_destination_entry(
27        dest_hash: [u8; 16],
28        state: &KnownDestinationState,
29    ) -> KnownDestinationEntry {
30        KnownDestinationEntry {
31            dest_hash,
32            identity_hash: state.announced.identity_hash.0,
33            public_key: state.announced.public_key,
34            app_data: state.announced.app_data.clone(),
35            hops: state.announced.hops,
36            received_at: state.announced.received_at,
37            receiving_interface: state.announced.receiving_interface,
38            was_used: state.was_used,
39            last_used_at: state.last_used_at,
40            retained: state.retained,
41        }
42    }
43
44    pub(crate) fn known_destination_entries(&self) -> Vec<KnownDestinationEntry> {
45        let mut entries: Vec<_> = self
46            .known_destinations
47            .iter()
48            .map(|(dest_hash, state)| Self::known_destination_entry(*dest_hash, state))
49            .collect();
50        entries.sort_by(|a, b| a.dest_hash.cmp(&b.dest_hash));
51        entries
52    }
53
54    pub(crate) fn mark_known_destination_used(&mut self, dest_hash: &[u8; 16]) -> bool {
55        let Some(state) = self.known_destinations.get_mut(dest_hash) else {
56            return false;
57        };
58        state.was_used = true;
59        state.last_used_at = Some(time::now());
60        true
61    }
62
63    pub(crate) fn retain_known_destination(&mut self, dest_hash: &[u8; 16]) -> bool {
64        let Some(state) = self.known_destinations.get_mut(dest_hash) else {
65            return false;
66        };
67        state.retained = true;
68        true
69    }
70
71    pub(crate) fn unretain_known_destination(&mut self, dest_hash: &[u8; 16]) -> bool {
72        let Some(state) = self.known_destinations.get_mut(dest_hash) else {
73            return false;
74        };
75        state.retained = false;
76        true
77    }
78
79    pub(crate) fn known_destination_announced(
80        &self,
81        dest_hash: &[u8; 16],
82    ) -> Option<crate::destination::AnnouncedIdentity> {
83        self.known_destinations
84            .get(dest_hash)
85            .map(|state| state.announced.clone())
86    }
87
88    pub(crate) fn known_destination_relevance_time(state: &KnownDestinationState) -> f64 {
89        state.last_used_at.unwrap_or(state.announced.received_at)
90    }
91
92    pub(crate) fn begin_drain(&mut self, timeout: Duration) {
93        let now = Instant::now();
94        let deadline = now + timeout;
95        match self.lifecycle_state {
96            LifecycleState::Active => {
97                self.lifecycle_state = LifecycleState::Draining;
98                self.drain_started_at = Some(now);
99                self.drain_deadline = Some(deadline);
100                log::info!(
101                    "driver entering drain mode with {:.3}s timeout",
102                    timeout.as_secs_f64()
103                );
104                self.stop_listener_accepts();
105            }
106            LifecycleState::Draining => {
107                self.drain_deadline = Some(deadline);
108                log::info!(
109                    "driver drain deadline updated to {:.3}s from now",
110                    timeout.as_secs_f64()
111                );
112                self.stop_listener_accepts();
113            }
114            LifecycleState::Stopping | LifecycleState::Stopped => {
115                log::debug!(
116                    "ignoring BeginDrain while lifecycle state is {:?}",
117                    self.lifecycle_state
118                );
119            }
120        }
121    }
122
123    pub(crate) fn is_draining(&self) -> bool {
124        matches!(self.lifecycle_state, LifecycleState::Draining)
125    }
126
127    pub fn register_listener_control(&mut self, control: crate::interface::ListenerControl) {
128        self.listener_controls.push(control);
129    }
130
131    pub(crate) fn stop_listener_accepts(&mut self) {
132        for control in &self.listener_controls {
133            control.request_stop();
134        }
135        #[cfg(feature = "hooks")]
136        if let Some(bridge) = self.provider_bridge.as_ref() {
137            bridge.stop_accepting();
138        }
139    }
140
141    pub(crate) fn reject_new_work(&self, op: &str) {
142        log::info!("rejecting {} while node is draining", op);
143    }
144
145    fn interface_writer_queued_frames(&self) -> usize {
146        self.interfaces
147            .values()
148            .map(|entry| {
149                entry
150                    .async_writer_metrics
151                    .as_ref()
152                    .map(|metrics| metrics.queued_frames())
153                    .unwrap_or(0)
154            })
155            .sum()
156    }
157
158    fn wait_for_writer_flush(&self, timeout: Duration) {
159        let deadline = Instant::now() + timeout;
160        while Instant::now() < deadline {
161            std::thread::sleep(Duration::from_millis(5));
162        }
163    }
164
165    pub(crate) fn graceful_shutdown(&mut self) {
166        if matches!(self.lifecycle_state, LifecycleState::Stopped) {
167            return;
168        }
169
170        self.lifecycle_state = LifecycleState::Stopping;
171        self.stop_listener_accepts();
172
173        let resource_actions = self.link_manager.cancel_all_resources(&mut self.rng);
174        let resource_flush = resource_actions
175            .iter()
176            .any(|action| matches!(action, LinkManagerAction::SendPacket { .. }));
177        self.dispatch_link_actions(resource_actions);
178
179        let link_actions = self.link_manager.teardown_all_links();
180        let link_flush = link_actions
181            .iter()
182            .any(|action| matches!(action, LinkManagerAction::SendPacket { .. }));
183        self.dispatch_link_actions(link_actions);
184
185        let cleanup_actions = self.link_manager.tick(&mut self.rng);
186        let cleanup_flush = cleanup_actions
187            .iter()
188            .any(|action| matches!(action, LinkManagerAction::SendPacket { .. }));
189        self.dispatch_link_actions(cleanup_actions);
190        self.holepunch_manager.abort_all_sessions();
191
192        if resource_flush
193            || link_flush
194            || cleanup_flush
195            || self.interface_writer_queued_frames() > 0
196        {
197            self.wait_for_writer_flush(DEFAULT_LINK_TEARDOWN_FLUSH);
198        }
199
200        self.lifecycle_state = LifecycleState::Stopped;
201    }
202
203    pub(crate) fn drain_error(&self, op: &str) -> String {
204        format!("cannot {} while node is draining", op)
205    }
206
207    pub(crate) fn drain_status(&self) -> DrainStatus {
208        let now = Instant::now();
209        let active_links = self.link_manager.link_count();
210        let active_resource_transfers = self.link_manager.resource_transfer_count();
211        let active_holepunch_sessions = self.holepunch_manager.session_count();
212        let interface_writer_queued_frames = self.interface_writer_queued_frames();
213        #[cfg(feature = "hooks")]
214        let (provider_backlog_events, provider_consumer_queued_events) = self
215            .provider_bridge
216            .as_ref()
217            .map(|bridge| {
218                let stats = bridge.stats();
219                (
220                    stats.backlog_len,
221                    stats
222                        .consumers
223                        .iter()
224                        .map(|consumer| consumer.queue_len)
225                        .sum(),
226                )
227            })
228            .unwrap_or((0, 0));
229        #[cfg(not(feature = "hooks"))]
230        let (provider_backlog_events, provider_consumer_queued_events) = (0, 0);
231        let drain_age_seconds = self
232            .drain_started_at
233            .map(|started| started.elapsed().as_secs_f64());
234        let deadline_remaining_seconds = self.drain_deadline.map(|deadline| {
235            deadline
236                .checked_duration_since(now)
237                .map(|remaining| remaining.as_secs_f64())
238                .unwrap_or(0.0)
239        });
240        let detail = match self.lifecycle_state {
241            LifecycleState::Active => Some("node is accepting normal work".into()),
242            LifecycleState::Draining => {
243                let mut remaining = Vec::new();
244                if active_links > 0 {
245                    remaining.push(format!("{active_links} link(s)"));
246                }
247                if active_resource_transfers > 0 {
248                    remaining.push(format!("{active_resource_transfers} resource transfer(s)"));
249                }
250                if active_holepunch_sessions > 0 {
251                    remaining.push(format!("{active_holepunch_sessions} hole-punch session(s)"));
252                }
253                if interface_writer_queued_frames > 0 {
254                    remaining.push(format!(
255                        "{interface_writer_queued_frames} queued interface writer frame(s)"
256                    ));
257                }
258                if provider_backlog_events > 0 {
259                    remaining.push(format!(
260                        "{provider_backlog_events} provider backlog event(s)"
261                    ));
262                }
263                if provider_consumer_queued_events > 0 {
264                    remaining.push(format!(
265                        "{provider_consumer_queued_events} queued provider consumer event(s)"
266                    ));
267                }
268                Some(if remaining.is_empty() {
269                    "node is draining existing work; no active links, resource transfers, hole-punch sessions, or queued writer/provider work remain".into()
270                } else {
271                    format!(
272                        "node is draining existing work; {} still active",
273                        remaining.join(", ")
274                    )
275                })
276            }
277            LifecycleState::Stopping => Some("node is tearing down remaining work".into()),
278            LifecycleState::Stopped => Some("node is stopped".into()),
279        };
280
281        DrainStatus {
282            state: self.lifecycle_state,
283            drain_age_seconds,
284            deadline_remaining_seconds,
285            drain_complete: !matches!(self.lifecycle_state, LifecycleState::Draining)
286                || (active_links == 0
287                    && active_resource_transfers == 0
288                    && active_holepunch_sessions == 0
289                    && interface_writer_queued_frames == 0
290                    && provider_backlog_events == 0
291                    && provider_consumer_queued_events == 0),
292            interface_writer_queued_frames,
293            provider_backlog_events,
294            provider_consumer_queued_events,
295            detail,
296        }
297    }
298
299    pub(crate) fn enforce_drain_deadline(&mut self) {
300        if !matches!(self.lifecycle_state, LifecycleState::Draining) {
301            return;
302        }
303        let Some(deadline) = self.drain_deadline else {
304            return;
305        };
306        if Instant::now() < deadline {
307            return;
308        }
309
310        log::info!("driver drain deadline reached; tearing down remaining links");
311        self.lifecycle_state = LifecycleState::Stopping;
312        let resource_actions = self.link_manager.cancel_all_resources(&mut self.rng);
313        self.dispatch_link_actions(resource_actions);
314        let link_actions = self.link_manager.teardown_all_links();
315        self.dispatch_link_actions(link_actions);
316        let cleanup_actions = self.link_manager.tick(&mut self.rng);
317        self.dispatch_link_actions(cleanup_actions);
318        self.holepunch_manager.abort_all_sessions();
319    }
320
321    pub(crate) fn enforce_known_destination_cap(&mut self, for_insert: bool) -> usize {
322        if self.known_destinations_max_entries == usize::MAX {
323            return 0;
324        }
325
326        let mut evicted = 0usize;
327        while if for_insert {
328            self.known_destinations.len() >= self.known_destinations_max_entries
329        } else {
330            self.known_destinations.len() > self.known_destinations_max_entries
331        } {
332            let active_dests = self.engine.active_destination_hashes();
333            let candidate = self
334                .oldest_known_destination(false, &active_dests)
335                .or_else(|| self.oldest_known_destination(true, &active_dests));
336            let Some(dest_hash) = candidate else {
337                break;
338            };
339            if self.known_destinations.remove(&dest_hash).is_some() {
340                evicted += 1;
341                self.known_destinations_cap_evict_count += 1;
342            } else {
343                break;
344            }
345        }
346        evicted
347    }
348
349    pub(crate) fn oldest_known_destination(
350        &self,
351        include_protected: bool,
352        active_dests: &std::collections::BTreeSet<[u8; 16]>,
353    ) -> Option<[u8; 16]> {
354        self.known_destinations
355            .iter()
356            .filter(|(dest_hash, state)| {
357                include_protected
358                    || (!active_dests.contains(*dest_hash)
359                        && !self.local_destinations.contains_key(*dest_hash)
360                        && !state.retained)
361            })
362            .min_by(|a, b| {
363                Self::known_destination_relevance_time(a.1)
364                    .partial_cmp(&Self::known_destination_relevance_time(b.1))
365                    .unwrap_or(std::cmp::Ordering::Equal)
366                    .then_with(|| a.0.cmp(b.0))
367            })
368            .map(|(dest_hash, _)| *dest_hash)
369    }
370
371    fn build_shared_announce_raw(
372        &mut self,
373        dest_hash: &[u8; 16],
374        record: &SharedAnnounceRecord,
375        path_response: bool,
376    ) -> Option<Vec<u8>> {
377        let identity = rns_crypto::identity::Identity::from_private_key(&record.identity_prv_key);
378
379        let mut random_hash = [0u8; 10];
380        self.rng.fill_bytes(&mut random_hash[..5]);
381        let now_secs = std::time::SystemTime::now()
382            .duration_since(std::time::UNIX_EPOCH)
383            .ok()?
384            .as_secs();
385        random_hash[5..10].copy_from_slice(&now_secs.to_be_bytes()[3..8]);
386
387        let (announce_data, _has_ratchet) = rns_core::announce::AnnounceData::pack(
388            &identity,
389            dest_hash,
390            &record.name_hash,
391            &random_hash,
392            None,
393            record.app_data.as_deref(),
394        )
395        .ok()?;
396
397        let flags = rns_core::packet::PacketFlags {
398            header_type: rns_core::constants::HEADER_1,
399            context_flag: rns_core::constants::FLAG_UNSET,
400            transport_type: rns_core::constants::TRANSPORT_BROADCAST,
401            destination_type: rns_core::constants::DESTINATION_SINGLE,
402            packet_type: rns_core::constants::PACKET_TYPE_ANNOUNCE,
403        };
404        let context = if path_response {
405            rns_core::constants::CONTEXT_PATH_RESPONSE
406        } else {
407            rns_core::constants::CONTEXT_NONE
408        };
409
410        rns_core::packet::RawPacket::pack(flags, 0, dest_hash, None, context, &announce_data)
411            .ok()
412            .map(|packet| packet.raw)
413    }
414
415    pub(crate) fn replay_shared_announces(&mut self) {
416        let records: Vec<([u8; 16], SharedAnnounceRecord)> = self
417            .shared_announces
418            .iter()
419            .map(|(dest_hash, record)| (*dest_hash, record.clone()))
420            .collect();
421        for (dest_hash, record) in records {
422            if let Some(raw) = self.build_shared_announce_raw(&dest_hash, &record, true) {
423                let event = Event::SendOutbound {
424                    raw,
425                    dest_type: rns_core::constants::DESTINATION_SINGLE,
426                    attached_interface: None,
427                };
428                match event {
429                    Event::SendOutbound {
430                        raw,
431                        dest_type,
432                        attached_interface,
433                    } => match RawPacket::unpack(&raw) {
434                        Ok(packet) => {
435                            let actions = self.engine.handle_outbound(
436                                &packet,
437                                dest_type,
438                                attached_interface,
439                                time::now(),
440                            );
441                            self.dispatch_all(actions);
442                        }
443                        Err(e) => {
444                            log::warn!(
445                                "Shared announce replay failed for {:02x?}: {:?}",
446                                &dest_hash[..4],
447                                e
448                            );
449                        }
450                    },
451                    other => {
452                        log::warn!(
453                            "shared announce replay returned unexpected response: {:?}",
454                            other
455                        );
456                    }
457                }
458            }
459        }
460    }
461
462    pub(crate) fn handle_shared_interface_down(&mut self, id: InterfaceId) {
463        let dropped_paths = self.engine.drop_paths_for_interface(id);
464        let dropped_reverse = self.engine.drop_reverse_for_interface(id);
465        let dropped_links = self.engine.drop_links_for_interface(id);
466        self.engine.drop_announce_queues();
467        let link_actions = self.link_manager.teardown_all_links();
468        self.dispatch_link_actions(link_actions);
469        self.shared_reconnect_pending.insert(id, true);
470        log::info!(
471            "[{}] cleared shared state: {} paths, {} reverse entries, {} transport links",
472            id.0,
473            dropped_paths,
474            dropped_reverse,
475            dropped_links
476        );
477    }
478}