Skip to main content

rns_net/driver/
lifecycle.rs

1use super::*;
2
3impl Driver {
4    pub(crate) fn upsert_known_destination(
5        &mut self,
6        dest_hash: [u8; 16],
7        announced: crate::destination::AnnouncedIdentity,
8    ) {
9        if let Some(existing) = self.known_destinations.get_mut(&dest_hash) {
10            existing.announced = announced;
11            return;
12        }
13
14        self.enforce_known_destination_cap(true);
15        self.known_destinations.insert(
16            dest_hash,
17            KnownDestinationState {
18                announced,
19                was_used: false,
20                last_used_at: None,
21                retained: false,
22            },
23        );
24    }
25
26    pub(crate) fn known_destination_entry(
27        dest_hash: [u8; 16],
28        state: &KnownDestinationState,
29    ) -> KnownDestinationEntry {
30        KnownDestinationEntry {
31            dest_hash,
32            identity_hash: state.announced.identity_hash.0,
33            public_key: state.announced.public_key,
34            app_data: state.announced.app_data.clone(),
35            hops: state.announced.hops,
36            received_at: state.announced.received_at,
37            receiving_interface: state.announced.receiving_interface,
38            was_used: state.was_used,
39            last_used_at: state.last_used_at,
40            retained: state.retained,
41        }
42    }
43
44    pub(crate) fn known_destination_entries(&self) -> Vec<KnownDestinationEntry> {
45        let mut entries: Vec<_> = self
46            .known_destinations
47            .iter()
48            .map(|(dest_hash, state)| Self::known_destination_entry(*dest_hash, state))
49            .collect();
50        entries.sort_by(|a, b| a.dest_hash.cmp(&b.dest_hash));
51        entries
52    }
53
54    pub(crate) fn mark_known_destination_used(&mut self, dest_hash: &[u8; 16]) -> bool {
55        let Some(state) = self.known_destinations.get_mut(dest_hash) else {
56            return false;
57        };
58        state.was_used = true;
59        state.last_used_at = Some(time::now());
60        true
61    }
62
63    pub(crate) fn retain_known_destination(&mut self, dest_hash: &[u8; 16]) -> bool {
64        let Some(state) = self.known_destinations.get_mut(dest_hash) else {
65            return false;
66        };
67        state.retained = true;
68        true
69    }
70
71    pub(crate) fn retain_identity(&mut self, identity_hash: &[u8; 16]) -> bool {
72        let mut retained = false;
73        for state in self.known_destinations.values_mut() {
74            if &state.announced.identity_hash.0 == identity_hash {
75                state.retained = true;
76                retained = true;
77            }
78        }
79        retained
80    }
81
82    pub(crate) fn unretain_known_destination(&mut self, dest_hash: &[u8; 16]) -> bool {
83        let Some(state) = self.known_destinations.get_mut(dest_hash) else {
84            return false;
85        };
86        state.retained = false;
87        true
88    }
89
90    pub(crate) fn known_destination_announced(
91        &self,
92        dest_hash: &[u8; 16],
93    ) -> Option<crate::destination::AnnouncedIdentity> {
94        self.known_destinations
95            .get(dest_hash)
96            .map(|state| state.announced.clone())
97    }
98
99    pub(crate) fn known_destination_relevance_time(state: &KnownDestinationState) -> f64 {
100        state.last_used_at.unwrap_or(state.announced.received_at)
101    }
102
103    pub(crate) fn begin_drain(&mut self, timeout: Duration) {
104        let now = Instant::now();
105        let deadline = now + timeout;
106        match self.lifecycle_state {
107            LifecycleState::Active => {
108                self.lifecycle_state = LifecycleState::Draining;
109                self.drain_started_at = Some(now);
110                self.drain_deadline = Some(deadline);
111                log::info!(
112                    "driver entering drain mode with {:.3}s timeout",
113                    timeout.as_secs_f64()
114                );
115                self.stop_listener_accepts();
116            }
117            LifecycleState::Draining => {
118                self.drain_deadline = Some(deadline);
119                log::info!(
120                    "driver drain deadline updated to {:.3}s from now",
121                    timeout.as_secs_f64()
122                );
123                self.stop_listener_accepts();
124            }
125            LifecycleState::Stopping | LifecycleState::Stopped => {
126                log::debug!(
127                    "ignoring BeginDrain while lifecycle state is {:?}",
128                    self.lifecycle_state
129                );
130            }
131        }
132    }
133
134    pub(crate) fn is_draining(&self) -> bool {
135        matches!(self.lifecycle_state, LifecycleState::Draining)
136    }
137
138    pub fn register_listener_control(&mut self, control: crate::interface::ListenerControl) {
139        self.listener_controls.push(control);
140    }
141
142    pub(crate) fn stop_listener_accepts(&mut self) {
143        for control in &self.listener_controls {
144            control.request_stop();
145        }
146        #[cfg(feature = "hooks")]
147        if let Some(bridge) = self.provider_bridge.as_ref() {
148            bridge.stop_accepting();
149        }
150    }
151
152    pub(crate) fn reject_new_work(&self, op: &str) {
153        log::info!("rejecting {} while node is draining", op);
154    }
155
156    fn interface_writer_queued_frames(&self) -> usize {
157        self.interfaces
158            .values()
159            .map(|entry| {
160                entry
161                    .async_writer_metrics
162                    .as_ref()
163                    .map(|metrics| metrics.queued_frames())
164                    .unwrap_or(0)
165            })
166            .sum()
167    }
168
169    fn wait_for_writer_flush(&self, timeout: Duration) {
170        let deadline = Instant::now() + timeout;
171        while Instant::now() < deadline {
172            std::thread::sleep(Duration::from_millis(5));
173        }
174    }
175
176    pub(crate) fn graceful_shutdown(&mut self) {
177        if matches!(self.lifecycle_state, LifecycleState::Stopped) {
178            return;
179        }
180
181        self.lifecycle_state = LifecycleState::Stopping;
182        self.stop_listener_accepts();
183
184        let resource_actions = self.link_manager.cancel_all_resources(&mut self.rng);
185        let resource_flush = resource_actions
186            .iter()
187            .any(|action| matches!(action, LinkManagerAction::SendPacket { .. }));
188        self.dispatch_link_actions(resource_actions);
189
190        let link_actions = self.link_manager.teardown_all_links();
191        let link_flush = link_actions
192            .iter()
193            .any(|action| matches!(action, LinkManagerAction::SendPacket { .. }));
194        self.dispatch_link_actions(link_actions);
195
196        let cleanup_actions = self.link_manager.tick(&mut self.rng);
197        let cleanup_flush = cleanup_actions
198            .iter()
199            .any(|action| matches!(action, LinkManagerAction::SendPacket { .. }));
200        self.dispatch_link_actions(cleanup_actions);
201        self.holepunch_manager.abort_all_sessions();
202
203        if resource_flush
204            || link_flush
205            || cleanup_flush
206            || self.interface_writer_queued_frames() > 0
207        {
208            self.wait_for_writer_flush(DEFAULT_LINK_TEARDOWN_FLUSH);
209        }
210
211        self.engine.void_queues();
212        self.announce_verify_queue
213            .lock()
214            .unwrap_or_else(|poisoned| poisoned.into_inner())
215            .clear();
216        self.sent_packets.clear();
217        self.completed_proofs.clear();
218
219        self.lifecycle_state = LifecycleState::Stopped;
220    }
221
222    pub(crate) fn drain_error(&self, op: &str) -> String {
223        format!("cannot {} while node is draining", op)
224    }
225
226    pub(crate) fn drain_status(&self) -> DrainStatus {
227        let now = Instant::now();
228        let active_links = self.link_manager.link_count();
229        let active_resource_transfers = self.link_manager.resource_transfer_count();
230        let active_holepunch_sessions = self.holepunch_manager.session_count();
231        let interface_writer_queued_frames = self.interface_writer_queued_frames();
232        #[cfg(feature = "hooks")]
233        let (provider_backlog_events, provider_consumer_queued_events) = self
234            .provider_bridge
235            .as_ref()
236            .map(|bridge| {
237                let stats = bridge.stats();
238                (
239                    stats.backlog_len,
240                    stats
241                        .consumers
242                        .iter()
243                        .map(|consumer| consumer.queue_len)
244                        .sum(),
245                )
246            })
247            .unwrap_or((0, 0));
248        #[cfg(not(feature = "hooks"))]
249        let (provider_backlog_events, provider_consumer_queued_events) = (0, 0);
250        let drain_age_seconds = self
251            .drain_started_at
252            .map(|started| started.elapsed().as_secs_f64());
253        let deadline_remaining_seconds = self.drain_deadline.map(|deadline| {
254            deadline
255                .checked_duration_since(now)
256                .map(|remaining| remaining.as_secs_f64())
257                .unwrap_or(0.0)
258        });
259        let detail = match self.lifecycle_state {
260            LifecycleState::Active => Some("node is accepting normal work".into()),
261            LifecycleState::Draining => {
262                let mut remaining = Vec::new();
263                if active_links > 0 {
264                    remaining.push(format!("{active_links} link(s)"));
265                }
266                if active_resource_transfers > 0 {
267                    remaining.push(format!("{active_resource_transfers} resource transfer(s)"));
268                }
269                if active_holepunch_sessions > 0 {
270                    remaining.push(format!("{active_holepunch_sessions} hole-punch session(s)"));
271                }
272                if interface_writer_queued_frames > 0 {
273                    remaining.push(format!(
274                        "{interface_writer_queued_frames} queued interface writer frame(s)"
275                    ));
276                }
277                if provider_backlog_events > 0 {
278                    remaining.push(format!(
279                        "{provider_backlog_events} provider backlog event(s)"
280                    ));
281                }
282                if provider_consumer_queued_events > 0 {
283                    remaining.push(format!(
284                        "{provider_consumer_queued_events} queued provider consumer event(s)"
285                    ));
286                }
287                Some(if remaining.is_empty() {
288                    "node is draining existing work; no active links, resource transfers, hole-punch sessions, or queued writer/provider work remain".into()
289                } else {
290                    format!(
291                        "node is draining existing work; {} still active",
292                        remaining.join(", ")
293                    )
294                })
295            }
296            LifecycleState::Stopping => Some("node is tearing down remaining work".into()),
297            LifecycleState::Stopped => Some("node is stopped".into()),
298        };
299
300        DrainStatus {
301            state: self.lifecycle_state,
302            drain_age_seconds,
303            deadline_remaining_seconds,
304            drain_complete: !matches!(self.lifecycle_state, LifecycleState::Draining)
305                || (active_links == 0
306                    && active_resource_transfers == 0
307                    && active_holepunch_sessions == 0
308                    && interface_writer_queued_frames == 0
309                    && provider_backlog_events == 0
310                    && provider_consumer_queued_events == 0),
311            interface_writer_queued_frames,
312            provider_backlog_events,
313            provider_consumer_queued_events,
314            detail,
315        }
316    }
317
318    pub(crate) fn enforce_drain_deadline(&mut self) {
319        if !matches!(self.lifecycle_state, LifecycleState::Draining) {
320            return;
321        }
322        let Some(deadline) = self.drain_deadline else {
323            return;
324        };
325        if Instant::now() < deadline {
326            return;
327        }
328
329        log::info!("driver drain deadline reached; tearing down remaining links");
330        self.lifecycle_state = LifecycleState::Stopping;
331        let resource_actions = self.link_manager.cancel_all_resources(&mut self.rng);
332        self.dispatch_link_actions(resource_actions);
333        let link_actions = self.link_manager.teardown_all_links();
334        self.dispatch_link_actions(link_actions);
335        let cleanup_actions = self.link_manager.tick(&mut self.rng);
336        self.dispatch_link_actions(cleanup_actions);
337        self.holepunch_manager.abort_all_sessions();
338    }
339
340    pub(crate) fn enforce_known_destination_cap(&mut self, for_insert: bool) -> usize {
341        if self.known_destinations_max_entries == usize::MAX {
342            return 0;
343        }
344
345        let mut evicted = 0usize;
346        while if for_insert {
347            self.known_destinations.len() >= self.known_destinations_max_entries
348        } else {
349            self.known_destinations.len() > self.known_destinations_max_entries
350        } {
351            let active_dests = self.engine.active_destination_hashes();
352            let candidate = self
353                .oldest_known_destination(false, &active_dests)
354                .or_else(|| self.oldest_known_destination(true, &active_dests));
355            let Some(dest_hash) = candidate else {
356                break;
357            };
358            if self.known_destinations.remove(&dest_hash).is_some() {
359                evicted += 1;
360                self.known_destinations_cap_evict_count += 1;
361            } else {
362                break;
363            }
364        }
365        evicted
366    }
367
368    pub(crate) fn oldest_known_destination(
369        &self,
370        include_protected: bool,
371        active_dests: &std::collections::BTreeSet<[u8; 16]>,
372    ) -> Option<[u8; 16]> {
373        self.known_destinations
374            .iter()
375            .filter(|(dest_hash, state)| {
376                include_protected
377                    || (!active_dests.contains(*dest_hash)
378                        && !self.local_destinations.contains_key(*dest_hash)
379                        && !state.retained)
380            })
381            .min_by(|a, b| {
382                Self::known_destination_relevance_time(a.1)
383                    .partial_cmp(&Self::known_destination_relevance_time(b.1))
384                    .unwrap_or(std::cmp::Ordering::Equal)
385                    .then_with(|| a.0.cmp(b.0))
386            })
387            .map(|(dest_hash, _)| *dest_hash)
388    }
389
390    fn build_shared_announce_raw(
391        &mut self,
392        dest_hash: &[u8; 16],
393        record: &SharedAnnounceRecord,
394        path_response: bool,
395    ) -> Option<Vec<u8>> {
396        let identity = rns_crypto::identity::Identity::from_private_key(&record.identity_prv_key);
397
398        let mut random_hash = [0u8; 10];
399        self.rng.fill_bytes(&mut random_hash[..5]);
400        let now_secs = std::time::SystemTime::now()
401            .duration_since(std::time::UNIX_EPOCH)
402            .ok()?
403            .as_secs();
404        random_hash[5..10].copy_from_slice(&now_secs.to_be_bytes()[3..8]);
405
406        let (announce_data, _has_ratchet) = rns_core::announce::AnnounceData::pack(
407            &identity,
408            dest_hash,
409            &record.name_hash,
410            &random_hash,
411            None,
412            record.app_data.as_deref(),
413        )
414        .ok()?;
415
416        let flags = rns_core::packet::PacketFlags {
417            header_type: rns_core::constants::HEADER_1,
418            context_flag: rns_core::constants::FLAG_UNSET,
419            transport_type: rns_core::constants::TRANSPORT_BROADCAST,
420            destination_type: rns_core::constants::DESTINATION_SINGLE,
421            packet_type: rns_core::constants::PACKET_TYPE_ANNOUNCE,
422        };
423        let context = if path_response {
424            rns_core::constants::CONTEXT_PATH_RESPONSE
425        } else {
426            rns_core::constants::CONTEXT_NONE
427        };
428
429        rns_core::packet::RawPacket::pack(flags, 0, dest_hash, None, context, &announce_data)
430            .ok()
431            .map(|packet| packet.raw)
432    }
433
434    pub(crate) fn replay_shared_announces(&mut self) {
435        let records: Vec<([u8; 16], SharedAnnounceRecord)> = self
436            .shared_announces
437            .iter()
438            .map(|(dest_hash, record)| (*dest_hash, record.clone()))
439            .collect();
440        for (dest_hash, record) in records {
441            if let Some(raw) = self.build_shared_announce_raw(&dest_hash, &record, true) {
442                let event = Event::SendOutbound {
443                    raw,
444                    dest_type: rns_core::constants::DESTINATION_SINGLE,
445                    attached_interface: None,
446                };
447                match event {
448                    Event::SendOutbound {
449                        raw,
450                        dest_type,
451                        attached_interface,
452                    } => match RawPacket::unpack(&raw) {
453                        Ok(packet) => {
454                            let actions = self.engine.handle_outbound(
455                                &packet,
456                                dest_type,
457                                attached_interface,
458                                time::now(),
459                            );
460                            self.dispatch_all(actions);
461                        }
462                        Err(e) => {
463                            log::warn!(
464                                "Shared announce replay failed for {:02x?}: {:?}",
465                                &dest_hash[..4],
466                                e
467                            );
468                        }
469                    },
470                    other => {
471                        log::warn!(
472                            "shared announce replay returned unexpected response: {:?}",
473                            other
474                        );
475                    }
476                }
477            }
478        }
479    }
480
481    pub(crate) fn handle_shared_interface_down(&mut self, id: InterfaceId) {
482        let dropped_paths = self.engine.drop_paths_for_interface(id);
483        let dropped_reverse = self.engine.drop_reverse_for_interface(id);
484        let dropped_links = self.engine.drop_links_for_interface(id);
485        self.engine.drop_announce_queues();
486        let link_actions = self.link_manager.teardown_all_links();
487        self.dispatch_link_actions(link_actions);
488        self.shared_reconnect_pending.insert(id, true);
489        log::info!(
490            "[{}] cleared shared state: {} paths, {} reverse entries, {} transport links",
491            id.0,
492            dropped_paths,
493            dropped_reverse,
494            dropped_links
495        );
496    }
497}