1use super::*;
2
3impl Driver {
4 pub(crate) fn upsert_known_destination(
5 &mut self,
6 dest_hash: [u8; 16],
7 announced: crate::destination::AnnouncedIdentity,
8 ) {
9 if let Some(existing) = self.known_destinations.get_mut(&dest_hash) {
10 existing.announced = announced;
11 return;
12 }
13
14 self.enforce_known_destination_cap(true);
15 self.known_destinations.insert(
16 dest_hash,
17 KnownDestinationState {
18 announced,
19 was_used: false,
20 last_used_at: None,
21 retained: false,
22 },
23 );
24 }
25
26 pub(crate) fn known_destination_entry(
27 dest_hash: [u8; 16],
28 state: &KnownDestinationState,
29 ) -> KnownDestinationEntry {
30 KnownDestinationEntry {
31 dest_hash,
32 identity_hash: state.announced.identity_hash.0,
33 public_key: state.announced.public_key,
34 app_data: state.announced.app_data.clone(),
35 hops: state.announced.hops,
36 received_at: state.announced.received_at,
37 receiving_interface: state.announced.receiving_interface,
38 was_used: state.was_used,
39 last_used_at: state.last_used_at,
40 retained: state.retained,
41 }
42 }
43
44 pub(crate) fn known_destination_entries(&self) -> Vec<KnownDestinationEntry> {
45 let mut entries: Vec<_> = self
46 .known_destinations
47 .iter()
48 .map(|(dest_hash, state)| Self::known_destination_entry(*dest_hash, state))
49 .collect();
50 entries.sort_by(|a, b| a.dest_hash.cmp(&b.dest_hash));
51 entries
52 }
53
54 pub(crate) fn mark_known_destination_used(&mut self, dest_hash: &[u8; 16]) -> bool {
55 let Some(state) = self.known_destinations.get_mut(dest_hash) else {
56 return false;
57 };
58 state.was_used = true;
59 state.last_used_at = Some(time::now());
60 true
61 }
62
63 pub(crate) fn retain_known_destination(&mut self, dest_hash: &[u8; 16]) -> bool {
64 let Some(state) = self.known_destinations.get_mut(dest_hash) else {
65 return false;
66 };
67 state.retained = true;
68 true
69 }
70
71 pub(crate) fn unretain_known_destination(&mut self, dest_hash: &[u8; 16]) -> bool {
72 let Some(state) = self.known_destinations.get_mut(dest_hash) else {
73 return false;
74 };
75 state.retained = false;
76 true
77 }
78
79 pub(crate) fn known_destination_announced(
80 &self,
81 dest_hash: &[u8; 16],
82 ) -> Option<crate::destination::AnnouncedIdentity> {
83 self.known_destinations
84 .get(dest_hash)
85 .map(|state| state.announced.clone())
86 }
87
88 pub(crate) fn known_destination_relevance_time(state: &KnownDestinationState) -> f64 {
89 state.last_used_at.unwrap_or(state.announced.received_at)
90 }
91
92 pub(crate) fn begin_drain(&mut self, timeout: Duration) {
93 let now = Instant::now();
94 let deadline = now + timeout;
95 match self.lifecycle_state {
96 LifecycleState::Active => {
97 self.lifecycle_state = LifecycleState::Draining;
98 self.drain_started_at = Some(now);
99 self.drain_deadline = Some(deadline);
100 log::info!(
101 "driver entering drain mode with {:.3}s timeout",
102 timeout.as_secs_f64()
103 );
104 self.stop_listener_accepts();
105 }
106 LifecycleState::Draining => {
107 self.drain_deadline = Some(deadline);
108 log::info!(
109 "driver drain deadline updated to {:.3}s from now",
110 timeout.as_secs_f64()
111 );
112 self.stop_listener_accepts();
113 }
114 LifecycleState::Stopping | LifecycleState::Stopped => {
115 log::debug!(
116 "ignoring BeginDrain while lifecycle state is {:?}",
117 self.lifecycle_state
118 );
119 }
120 }
121 }
122
123 pub(crate) fn is_draining(&self) -> bool {
124 matches!(self.lifecycle_state, LifecycleState::Draining)
125 }
126
127 pub fn register_listener_control(&mut self, control: crate::interface::ListenerControl) {
128 self.listener_controls.push(control);
129 }
130
131 pub(crate) fn stop_listener_accepts(&mut self) {
132 for control in &self.listener_controls {
133 control.request_stop();
134 }
135 #[cfg(feature = "hooks")]
136 if let Some(bridge) = self.provider_bridge.as_ref() {
137 bridge.stop_accepting();
138 }
139 }
140
141 pub(crate) fn reject_new_work(&self, op: &str) {
142 log::info!("rejecting {} while node is draining", op);
143 }
144
145 fn interface_writer_queued_frames(&self) -> usize {
146 self.interfaces
147 .values()
148 .map(|entry| {
149 entry
150 .async_writer_metrics
151 .as_ref()
152 .map(|metrics| metrics.queued_frames())
153 .unwrap_or(0)
154 })
155 .sum()
156 }
157
158 fn wait_for_writer_flush(&self, timeout: Duration) {
159 let deadline = Instant::now() + timeout;
160 while Instant::now() < deadline {
161 std::thread::sleep(Duration::from_millis(5));
162 }
163 }
164
165 pub(crate) fn graceful_shutdown(&mut self) {
166 if matches!(self.lifecycle_state, LifecycleState::Stopped) {
167 return;
168 }
169
170 self.lifecycle_state = LifecycleState::Stopping;
171 self.stop_listener_accepts();
172
173 let resource_actions = self.link_manager.cancel_all_resources(&mut self.rng);
174 let resource_flush = resource_actions
175 .iter()
176 .any(|action| matches!(action, LinkManagerAction::SendPacket { .. }));
177 self.dispatch_link_actions(resource_actions);
178
179 let link_actions = self.link_manager.teardown_all_links();
180 let link_flush = link_actions
181 .iter()
182 .any(|action| matches!(action, LinkManagerAction::SendPacket { .. }));
183 self.dispatch_link_actions(link_actions);
184
185 let cleanup_actions = self.link_manager.tick(&mut self.rng);
186 let cleanup_flush = cleanup_actions
187 .iter()
188 .any(|action| matches!(action, LinkManagerAction::SendPacket { .. }));
189 self.dispatch_link_actions(cleanup_actions);
190 self.holepunch_manager.abort_all_sessions();
191
192 if resource_flush
193 || link_flush
194 || cleanup_flush
195 || self.interface_writer_queued_frames() > 0
196 {
197 self.wait_for_writer_flush(DEFAULT_LINK_TEARDOWN_FLUSH);
198 }
199
200 self.lifecycle_state = LifecycleState::Stopped;
201 }
202
203 pub(crate) fn drain_error(&self, op: &str) -> String {
204 format!("cannot {} while node is draining", op)
205 }
206
207 pub(crate) fn drain_status(&self) -> DrainStatus {
208 let now = Instant::now();
209 let active_links = self.link_manager.link_count();
210 let active_resource_transfers = self.link_manager.resource_transfer_count();
211 let active_holepunch_sessions = self.holepunch_manager.session_count();
212 let interface_writer_queued_frames = self.interface_writer_queued_frames();
213 #[cfg(feature = "hooks")]
214 let (provider_backlog_events, provider_consumer_queued_events) = self
215 .provider_bridge
216 .as_ref()
217 .map(|bridge| {
218 let stats = bridge.stats();
219 (
220 stats.backlog_len,
221 stats
222 .consumers
223 .iter()
224 .map(|consumer| consumer.queue_len)
225 .sum(),
226 )
227 })
228 .unwrap_or((0, 0));
229 #[cfg(not(feature = "hooks"))]
230 let (provider_backlog_events, provider_consumer_queued_events) = (0, 0);
231 let drain_age_seconds = self
232 .drain_started_at
233 .map(|started| started.elapsed().as_secs_f64());
234 let deadline_remaining_seconds = self.drain_deadline.map(|deadline| {
235 deadline
236 .checked_duration_since(now)
237 .map(|remaining| remaining.as_secs_f64())
238 .unwrap_or(0.0)
239 });
240 let detail = match self.lifecycle_state {
241 LifecycleState::Active => Some("node is accepting normal work".into()),
242 LifecycleState::Draining => {
243 let mut remaining = Vec::new();
244 if active_links > 0 {
245 remaining.push(format!("{active_links} link(s)"));
246 }
247 if active_resource_transfers > 0 {
248 remaining.push(format!("{active_resource_transfers} resource transfer(s)"));
249 }
250 if active_holepunch_sessions > 0 {
251 remaining.push(format!("{active_holepunch_sessions} hole-punch session(s)"));
252 }
253 if interface_writer_queued_frames > 0 {
254 remaining.push(format!(
255 "{interface_writer_queued_frames} queued interface writer frame(s)"
256 ));
257 }
258 if provider_backlog_events > 0 {
259 remaining.push(format!(
260 "{provider_backlog_events} provider backlog event(s)"
261 ));
262 }
263 if provider_consumer_queued_events > 0 {
264 remaining.push(format!(
265 "{provider_consumer_queued_events} queued provider consumer event(s)"
266 ));
267 }
268 Some(if remaining.is_empty() {
269 "node is draining existing work; no active links, resource transfers, hole-punch sessions, or queued writer/provider work remain".into()
270 } else {
271 format!(
272 "node is draining existing work; {} still active",
273 remaining.join(", ")
274 )
275 })
276 }
277 LifecycleState::Stopping => Some("node is tearing down remaining work".into()),
278 LifecycleState::Stopped => Some("node is stopped".into()),
279 };
280
281 DrainStatus {
282 state: self.lifecycle_state,
283 drain_age_seconds,
284 deadline_remaining_seconds,
285 drain_complete: !matches!(self.lifecycle_state, LifecycleState::Draining)
286 || (active_links == 0
287 && active_resource_transfers == 0
288 && active_holepunch_sessions == 0
289 && interface_writer_queued_frames == 0
290 && provider_backlog_events == 0
291 && provider_consumer_queued_events == 0),
292 interface_writer_queued_frames,
293 provider_backlog_events,
294 provider_consumer_queued_events,
295 detail,
296 }
297 }
298
299 pub(crate) fn enforce_drain_deadline(&mut self) {
300 if !matches!(self.lifecycle_state, LifecycleState::Draining) {
301 return;
302 }
303 let Some(deadline) = self.drain_deadline else {
304 return;
305 };
306 if Instant::now() < deadline {
307 return;
308 }
309
310 log::info!("driver drain deadline reached; tearing down remaining links");
311 self.lifecycle_state = LifecycleState::Stopping;
312 let resource_actions = self.link_manager.cancel_all_resources(&mut self.rng);
313 self.dispatch_link_actions(resource_actions);
314 let link_actions = self.link_manager.teardown_all_links();
315 self.dispatch_link_actions(link_actions);
316 let cleanup_actions = self.link_manager.tick(&mut self.rng);
317 self.dispatch_link_actions(cleanup_actions);
318 self.holepunch_manager.abort_all_sessions();
319 }
320
321 pub(crate) fn enforce_known_destination_cap(&mut self, for_insert: bool) -> usize {
322 if self.known_destinations_max_entries == usize::MAX {
323 return 0;
324 }
325
326 let mut evicted = 0usize;
327 while if for_insert {
328 self.known_destinations.len() >= self.known_destinations_max_entries
329 } else {
330 self.known_destinations.len() > self.known_destinations_max_entries
331 } {
332 let active_dests = self.engine.active_destination_hashes();
333 let candidate = self
334 .oldest_known_destination(false, &active_dests)
335 .or_else(|| self.oldest_known_destination(true, &active_dests));
336 let Some(dest_hash) = candidate else {
337 break;
338 };
339 if self.known_destinations.remove(&dest_hash).is_some() {
340 evicted += 1;
341 self.known_destinations_cap_evict_count += 1;
342 } else {
343 break;
344 }
345 }
346 evicted
347 }
348
349 pub(crate) fn oldest_known_destination(
350 &self,
351 include_protected: bool,
352 active_dests: &std::collections::BTreeSet<[u8; 16]>,
353 ) -> Option<[u8; 16]> {
354 self.known_destinations
355 .iter()
356 .filter(|(dest_hash, state)| {
357 include_protected
358 || (!active_dests.contains(*dest_hash)
359 && !self.local_destinations.contains_key(*dest_hash)
360 && !state.retained)
361 })
362 .min_by(|a, b| {
363 Self::known_destination_relevance_time(a.1)
364 .partial_cmp(&Self::known_destination_relevance_time(b.1))
365 .unwrap_or(std::cmp::Ordering::Equal)
366 .then_with(|| a.0.cmp(b.0))
367 })
368 .map(|(dest_hash, _)| *dest_hash)
369 }
370
371 fn build_shared_announce_raw(
372 &mut self,
373 dest_hash: &[u8; 16],
374 record: &SharedAnnounceRecord,
375 path_response: bool,
376 ) -> Option<Vec<u8>> {
377 let identity = rns_crypto::identity::Identity::from_private_key(&record.identity_prv_key);
378
379 let mut random_hash = [0u8; 10];
380 self.rng.fill_bytes(&mut random_hash[..5]);
381 let now_secs = std::time::SystemTime::now()
382 .duration_since(std::time::UNIX_EPOCH)
383 .ok()?
384 .as_secs();
385 random_hash[5..10].copy_from_slice(&now_secs.to_be_bytes()[3..8]);
386
387 let (announce_data, _has_ratchet) = rns_core::announce::AnnounceData::pack(
388 &identity,
389 dest_hash,
390 &record.name_hash,
391 &random_hash,
392 None,
393 record.app_data.as_deref(),
394 )
395 .ok()?;
396
397 let flags = rns_core::packet::PacketFlags {
398 header_type: rns_core::constants::HEADER_1,
399 context_flag: rns_core::constants::FLAG_UNSET,
400 transport_type: rns_core::constants::TRANSPORT_BROADCAST,
401 destination_type: rns_core::constants::DESTINATION_SINGLE,
402 packet_type: rns_core::constants::PACKET_TYPE_ANNOUNCE,
403 };
404 let context = if path_response {
405 rns_core::constants::CONTEXT_PATH_RESPONSE
406 } else {
407 rns_core::constants::CONTEXT_NONE
408 };
409
410 rns_core::packet::RawPacket::pack(flags, 0, dest_hash, None, context, &announce_data)
411 .ok()
412 .map(|packet| packet.raw)
413 }
414
415 pub(crate) fn replay_shared_announces(&mut self) {
416 let records: Vec<([u8; 16], SharedAnnounceRecord)> = self
417 .shared_announces
418 .iter()
419 .map(|(dest_hash, record)| (*dest_hash, record.clone()))
420 .collect();
421 for (dest_hash, record) in records {
422 if let Some(raw) = self.build_shared_announce_raw(&dest_hash, &record, true) {
423 let event = Event::SendOutbound {
424 raw,
425 dest_type: rns_core::constants::DESTINATION_SINGLE,
426 attached_interface: None,
427 };
428 match event {
429 Event::SendOutbound {
430 raw,
431 dest_type,
432 attached_interface,
433 } => match RawPacket::unpack(&raw) {
434 Ok(packet) => {
435 let actions = self.engine.handle_outbound(
436 &packet,
437 dest_type,
438 attached_interface,
439 time::now(),
440 );
441 self.dispatch_all(actions);
442 }
443 Err(e) => {
444 log::warn!(
445 "Shared announce replay failed for {:02x?}: {:?}",
446 &dest_hash[..4],
447 e
448 );
449 }
450 },
451 other => {
452 log::warn!(
453 "shared announce replay returned unexpected response: {:?}",
454 other
455 );
456 }
457 }
458 }
459 }
460 }
461
462 pub(crate) fn handle_shared_interface_down(&mut self, id: InterfaceId) {
463 let dropped_paths = self.engine.drop_paths_for_interface(id);
464 let dropped_reverse = self.engine.drop_reverse_for_interface(id);
465 let dropped_links = self.engine.drop_links_for_interface(id);
466 self.engine.drop_announce_queues();
467 let link_actions = self.link_manager.teardown_all_links();
468 self.dispatch_link_actions(link_actions);
469 self.shared_reconnect_pending.insert(id, true);
470 log::info!(
471 "[{}] cleared shared state: {} paths, {} reverse entries, {} transport links",
472 id.0,
473 dropped_paths,
474 dropped_reverse,
475 dropped_links
476 );
477 }
478}