1use super::*;
2
3impl Driver {
4 pub(crate) fn upsert_known_destination(
5 &mut self,
6 dest_hash: [u8; 16],
7 announced: crate::destination::AnnouncedIdentity,
8 ) {
9 if let Some(existing) = self.known_destinations.get_mut(&dest_hash) {
10 existing.announced = announced;
11 return;
12 }
13
14 self.enforce_known_destination_cap(true);
15 self.known_destinations.insert(
16 dest_hash,
17 KnownDestinationState {
18 announced,
19 was_used: false,
20 last_used_at: None,
21 retained: false,
22 },
23 );
24 }
25
26 pub(crate) fn known_destination_entry(
27 dest_hash: [u8; 16],
28 state: &KnownDestinationState,
29 ) -> KnownDestinationEntry {
30 KnownDestinationEntry {
31 dest_hash,
32 identity_hash: state.announced.identity_hash.0,
33 public_key: state.announced.public_key,
34 app_data: state.announced.app_data.clone(),
35 hops: state.announced.hops,
36 received_at: state.announced.received_at,
37 receiving_interface: state.announced.receiving_interface,
38 was_used: state.was_used,
39 last_used_at: state.last_used_at,
40 retained: state.retained,
41 }
42 }
43
44 pub(crate) fn known_destination_entries(&self) -> Vec<KnownDestinationEntry> {
45 let mut entries: Vec<_> = self
46 .known_destinations
47 .iter()
48 .map(|(dest_hash, state)| Self::known_destination_entry(*dest_hash, state))
49 .collect();
50 entries.sort_by(|a, b| a.dest_hash.cmp(&b.dest_hash));
51 entries
52 }
53
54 pub(crate) fn mark_known_destination_used(&mut self, dest_hash: &[u8; 16]) -> bool {
55 let Some(state) = self.known_destinations.get_mut(dest_hash) else {
56 return false;
57 };
58 state.was_used = true;
59 state.last_used_at = Some(time::now());
60 true
61 }
62
63 pub(crate) fn retain_known_destination(&mut self, dest_hash: &[u8; 16]) -> bool {
64 let Some(state) = self.known_destinations.get_mut(dest_hash) else {
65 return false;
66 };
67 state.retained = true;
68 true
69 }
70
71 pub(crate) fn retain_identity(&mut self, identity_hash: &[u8; 16]) -> bool {
72 let mut retained = false;
73 for state in self.known_destinations.values_mut() {
74 if &state.announced.identity_hash.0 == identity_hash {
75 state.retained = true;
76 retained = true;
77 }
78 }
79 retained
80 }
81
82 pub(crate) fn unretain_known_destination(&mut self, dest_hash: &[u8; 16]) -> bool {
83 let Some(state) = self.known_destinations.get_mut(dest_hash) else {
84 return false;
85 };
86 state.retained = false;
87 true
88 }
89
90 pub(crate) fn known_destination_announced(
91 &self,
92 dest_hash: &[u8; 16],
93 ) -> Option<crate::destination::AnnouncedIdentity> {
94 self.known_destinations
95 .get(dest_hash)
96 .map(|state| state.announced.clone())
97 }
98
99 pub(crate) fn known_destination_relevance_time(state: &KnownDestinationState) -> f64 {
100 state.last_used_at.unwrap_or(state.announced.received_at)
101 }
102
103 pub(crate) fn begin_drain(&mut self, timeout: Duration) {
104 let now = Instant::now();
105 let deadline = now + timeout;
106 match self.lifecycle_state {
107 LifecycleState::Active => {
108 self.lifecycle_state = LifecycleState::Draining;
109 self.drain_started_at = Some(now);
110 self.drain_deadline = Some(deadline);
111 log::info!(
112 "driver entering drain mode with {:.3}s timeout",
113 timeout.as_secs_f64()
114 );
115 self.stop_listener_accepts();
116 }
117 LifecycleState::Draining => {
118 self.drain_deadline = Some(deadline);
119 log::info!(
120 "driver drain deadline updated to {:.3}s from now",
121 timeout.as_secs_f64()
122 );
123 self.stop_listener_accepts();
124 }
125 LifecycleState::Stopping | LifecycleState::Stopped => {
126 log::debug!(
127 "ignoring BeginDrain while lifecycle state is {:?}",
128 self.lifecycle_state
129 );
130 }
131 }
132 }
133
134 pub(crate) fn is_draining(&self) -> bool {
135 matches!(self.lifecycle_state, LifecycleState::Draining)
136 }
137
138 pub fn register_listener_control(&mut self, control: crate::interface::ListenerControl) {
139 self.listener_controls.push(control);
140 }
141
142 pub(crate) fn stop_listener_accepts(&mut self) {
143 for control in &self.listener_controls {
144 control.request_stop();
145 }
146 #[cfg(feature = "hooks")]
147 if let Some(bridge) = self.provider_bridge.as_ref() {
148 bridge.stop_accepting();
149 }
150 }
151
152 pub(crate) fn reject_new_work(&self, op: &str) {
153 log::info!("rejecting {} while node is draining", op);
154 }
155
156 fn interface_writer_queued_frames(&self) -> usize {
157 self.interfaces
158 .values()
159 .map(|entry| {
160 entry
161 .async_writer_metrics
162 .as_ref()
163 .map(|metrics| metrics.queued_frames())
164 .unwrap_or(0)
165 })
166 .sum()
167 }
168
169 fn wait_for_writer_flush(&self, timeout: Duration) {
170 let deadline = Instant::now() + timeout;
171 while Instant::now() < deadline {
172 std::thread::sleep(Duration::from_millis(5));
173 }
174 }
175
176 pub(crate) fn graceful_shutdown(&mut self) {
177 if matches!(self.lifecycle_state, LifecycleState::Stopped) {
178 return;
179 }
180
181 self.lifecycle_state = LifecycleState::Stopping;
182 self.stop_listener_accepts();
183
184 let resource_actions = self.link_manager.cancel_all_resources(&mut self.rng);
185 let resource_flush = resource_actions
186 .iter()
187 .any(|action| matches!(action, LinkManagerAction::SendPacket { .. }));
188 self.dispatch_link_actions(resource_actions);
189
190 let link_actions = self.link_manager.teardown_all_links();
191 let link_flush = link_actions
192 .iter()
193 .any(|action| matches!(action, LinkManagerAction::SendPacket { .. }));
194 self.dispatch_link_actions(link_actions);
195
196 let cleanup_actions = self.link_manager.tick(&mut self.rng);
197 let cleanup_flush = cleanup_actions
198 .iter()
199 .any(|action| matches!(action, LinkManagerAction::SendPacket { .. }));
200 self.dispatch_link_actions(cleanup_actions);
201 self.holepunch_manager.abort_all_sessions();
202
203 if resource_flush
204 || link_flush
205 || cleanup_flush
206 || self.interface_writer_queued_frames() > 0
207 {
208 self.wait_for_writer_flush(DEFAULT_LINK_TEARDOWN_FLUSH);
209 }
210
211 self.engine.void_queues();
212 self.announce_verify_queue
213 .lock()
214 .unwrap_or_else(|poisoned| poisoned.into_inner())
215 .clear();
216 self.sent_packets.clear();
217 self.completed_proofs.clear();
218
219 self.lifecycle_state = LifecycleState::Stopped;
220 }
221
222 pub(crate) fn drain_error(&self, op: &str) -> String {
223 format!("cannot {} while node is draining", op)
224 }
225
226 pub(crate) fn drain_status(&self) -> DrainStatus {
227 let now = Instant::now();
228 let active_links = self.link_manager.link_count();
229 let active_resource_transfers = self.link_manager.resource_transfer_count();
230 let active_holepunch_sessions = self.holepunch_manager.session_count();
231 let interface_writer_queued_frames = self.interface_writer_queued_frames();
232 #[cfg(feature = "hooks")]
233 let (provider_backlog_events, provider_consumer_queued_events) = self
234 .provider_bridge
235 .as_ref()
236 .map(|bridge| {
237 let stats = bridge.stats();
238 (
239 stats.backlog_len,
240 stats
241 .consumers
242 .iter()
243 .map(|consumer| consumer.queue_len)
244 .sum(),
245 )
246 })
247 .unwrap_or((0, 0));
248 #[cfg(not(feature = "hooks"))]
249 let (provider_backlog_events, provider_consumer_queued_events) = (0, 0);
250 let drain_age_seconds = self
251 .drain_started_at
252 .map(|started| started.elapsed().as_secs_f64());
253 let deadline_remaining_seconds = self.drain_deadline.map(|deadline| {
254 deadline
255 .checked_duration_since(now)
256 .map(|remaining| remaining.as_secs_f64())
257 .unwrap_or(0.0)
258 });
259 let detail = match self.lifecycle_state {
260 LifecycleState::Active => Some("node is accepting normal work".into()),
261 LifecycleState::Draining => {
262 let mut remaining = Vec::new();
263 if active_links > 0 {
264 remaining.push(format!("{active_links} link(s)"));
265 }
266 if active_resource_transfers > 0 {
267 remaining.push(format!("{active_resource_transfers} resource transfer(s)"));
268 }
269 if active_holepunch_sessions > 0 {
270 remaining.push(format!("{active_holepunch_sessions} hole-punch session(s)"));
271 }
272 if interface_writer_queued_frames > 0 {
273 remaining.push(format!(
274 "{interface_writer_queued_frames} queued interface writer frame(s)"
275 ));
276 }
277 if provider_backlog_events > 0 {
278 remaining.push(format!(
279 "{provider_backlog_events} provider backlog event(s)"
280 ));
281 }
282 if provider_consumer_queued_events > 0 {
283 remaining.push(format!(
284 "{provider_consumer_queued_events} queued provider consumer event(s)"
285 ));
286 }
287 Some(if remaining.is_empty() {
288 "node is draining existing work; no active links, resource transfers, hole-punch sessions, or queued writer/provider work remain".into()
289 } else {
290 format!(
291 "node is draining existing work; {} still active",
292 remaining.join(", ")
293 )
294 })
295 }
296 LifecycleState::Stopping => Some("node is tearing down remaining work".into()),
297 LifecycleState::Stopped => Some("node is stopped".into()),
298 };
299
300 DrainStatus {
301 state: self.lifecycle_state,
302 drain_age_seconds,
303 deadline_remaining_seconds,
304 drain_complete: !matches!(self.lifecycle_state, LifecycleState::Draining)
305 || (active_links == 0
306 && active_resource_transfers == 0
307 && active_holepunch_sessions == 0
308 && interface_writer_queued_frames == 0
309 && provider_backlog_events == 0
310 && provider_consumer_queued_events == 0),
311 interface_writer_queued_frames,
312 provider_backlog_events,
313 provider_consumer_queued_events,
314 detail,
315 }
316 }
317
318 pub(crate) fn enforce_drain_deadline(&mut self) {
319 if !matches!(self.lifecycle_state, LifecycleState::Draining) {
320 return;
321 }
322 let Some(deadline) = self.drain_deadline else {
323 return;
324 };
325 if Instant::now() < deadline {
326 return;
327 }
328
329 log::info!("driver drain deadline reached; tearing down remaining links");
330 self.lifecycle_state = LifecycleState::Stopping;
331 let resource_actions = self.link_manager.cancel_all_resources(&mut self.rng);
332 self.dispatch_link_actions(resource_actions);
333 let link_actions = self.link_manager.teardown_all_links();
334 self.dispatch_link_actions(link_actions);
335 let cleanup_actions = self.link_manager.tick(&mut self.rng);
336 self.dispatch_link_actions(cleanup_actions);
337 self.holepunch_manager.abort_all_sessions();
338 }
339
340 pub(crate) fn enforce_known_destination_cap(&mut self, for_insert: bool) -> usize {
341 if self.known_destinations_max_entries == usize::MAX {
342 return 0;
343 }
344
345 let mut evicted = 0usize;
346 while if for_insert {
347 self.known_destinations.len() >= self.known_destinations_max_entries
348 } else {
349 self.known_destinations.len() > self.known_destinations_max_entries
350 } {
351 let active_dests = self.engine.active_destination_hashes();
352 let candidate = self
353 .oldest_known_destination(false, &active_dests)
354 .or_else(|| self.oldest_known_destination(true, &active_dests));
355 let Some(dest_hash) = candidate else {
356 break;
357 };
358 if self.known_destinations.remove(&dest_hash).is_some() {
359 evicted += 1;
360 self.known_destinations_cap_evict_count += 1;
361 } else {
362 break;
363 }
364 }
365 evicted
366 }
367
368 pub(crate) fn oldest_known_destination(
369 &self,
370 include_protected: bool,
371 active_dests: &std::collections::BTreeSet<[u8; 16]>,
372 ) -> Option<[u8; 16]> {
373 self.known_destinations
374 .iter()
375 .filter(|(dest_hash, state)| {
376 include_protected
377 || (!active_dests.contains(*dest_hash)
378 && !self.local_destinations.contains_key(*dest_hash)
379 && !state.retained)
380 })
381 .min_by(|a, b| {
382 Self::known_destination_relevance_time(a.1)
383 .partial_cmp(&Self::known_destination_relevance_time(b.1))
384 .unwrap_or(std::cmp::Ordering::Equal)
385 .then_with(|| a.0.cmp(b.0))
386 })
387 .map(|(dest_hash, _)| *dest_hash)
388 }
389
390 fn build_shared_announce_raw(
391 &mut self,
392 dest_hash: &[u8; 16],
393 record: &SharedAnnounceRecord,
394 path_response: bool,
395 ) -> Option<Vec<u8>> {
396 let identity = rns_crypto::identity::Identity::from_private_key(&record.identity_prv_key);
397
398 let mut random_hash = [0u8; 10];
399 self.rng.fill_bytes(&mut random_hash[..5]);
400 let now_secs = std::time::SystemTime::now()
401 .duration_since(std::time::UNIX_EPOCH)
402 .ok()?
403 .as_secs();
404 random_hash[5..10].copy_from_slice(&now_secs.to_be_bytes()[3..8]);
405
406 let (announce_data, _has_ratchet) = rns_core::announce::AnnounceData::pack(
407 &identity,
408 dest_hash,
409 &record.name_hash,
410 &random_hash,
411 None,
412 record.app_data.as_deref(),
413 )
414 .ok()?;
415
416 let flags = rns_core::packet::PacketFlags {
417 header_type: rns_core::constants::HEADER_1,
418 context_flag: rns_core::constants::FLAG_UNSET,
419 transport_type: rns_core::constants::TRANSPORT_BROADCAST,
420 destination_type: rns_core::constants::DESTINATION_SINGLE,
421 packet_type: rns_core::constants::PACKET_TYPE_ANNOUNCE,
422 };
423 let context = if path_response {
424 rns_core::constants::CONTEXT_PATH_RESPONSE
425 } else {
426 rns_core::constants::CONTEXT_NONE
427 };
428
429 rns_core::packet::RawPacket::pack(flags, 0, dest_hash, None, context, &announce_data)
430 .ok()
431 .map(|packet| packet.raw)
432 }
433
434 pub(crate) fn replay_shared_announces(&mut self) {
435 let records: Vec<([u8; 16], SharedAnnounceRecord)> = self
436 .shared_announces
437 .iter()
438 .map(|(dest_hash, record)| (*dest_hash, record.clone()))
439 .collect();
440 for (dest_hash, record) in records {
441 if let Some(raw) = self.build_shared_announce_raw(&dest_hash, &record, true) {
442 let event = Event::SendOutbound {
443 raw,
444 dest_type: rns_core::constants::DESTINATION_SINGLE,
445 attached_interface: None,
446 };
447 match event {
448 Event::SendOutbound {
449 raw,
450 dest_type,
451 attached_interface,
452 } => match RawPacket::unpack(&raw) {
453 Ok(packet) => {
454 let actions = self.engine.handle_outbound(
455 &packet,
456 dest_type,
457 attached_interface,
458 time::now(),
459 );
460 self.dispatch_all(actions);
461 }
462 Err(e) => {
463 log::warn!(
464 "Shared announce replay failed for {:02x?}: {:?}",
465 &dest_hash[..4],
466 e
467 );
468 }
469 },
470 other => {
471 log::warn!(
472 "shared announce replay returned unexpected response: {:?}",
473 other
474 );
475 }
476 }
477 }
478 }
479 }
480
481 pub(crate) fn handle_shared_interface_down(&mut self, id: InterfaceId) {
482 let dropped_paths = self.engine.drop_paths_for_interface(id);
483 let dropped_reverse = self.engine.drop_reverse_for_interface(id);
484 let dropped_links = self.engine.drop_links_for_interface(id);
485 self.engine.drop_announce_queues();
486 let link_actions = self.link_manager.teardown_all_links();
487 self.dispatch_link_actions(link_actions);
488 self.shared_reconnect_pending.insert(id, true);
489 log::info!(
490 "[{}] cleared shared state: {} paths, {} reverse entries, {} transport links",
491 id.0,
492 dropped_paths,
493 dropped_reverse,
494 dropped_links
495 );
496 }
497}