ts_dataplane/async_tokio.rs
1//! The packet processing dataplane, as a tokio task.
2
3use std::{collections::HashMap, convert::Infallible, ops::DerefMut, sync::atomic::AtomicU32};
4
5use tokio::sync::{Mutex, mpsc};
6use ts_packet::PacketMut;
7use ts_transport::{OverlayTransportId, PeerId, UnderlayTransportId};
8use ts_tunnel::NodeKeyPair;
9
10use crate::{EventResult, InboundResult, OutboundResult};
11
12/// Queue for packets leaving the data plane "up" into an overlay transport.
13pub type DataplaneToOverlay = mpsc::UnboundedSender<Vec<PacketMut>>;
14
15/// Queue for packets entering the data plane "down" from an overlay transport.
16pub type DataplaneFromOverlay = mpsc::UnboundedReceiver<Vec<PacketMut>>;
17
18/// Queue for packets leaving the data plane "down" into an underlay transport.
19pub type DataplaneToUnderlay = mpsc::UnboundedSender<(PeerId, Vec<PacketMut>)>;
20
21/// Queue for packets entering the data plane "up" from an underlay transport.
22pub type DataplaneFromUnderlay = mpsc::UnboundedReceiver<(PeerId, Vec<PacketMut>)>;
23
24// TODO: wire in overlay/underlay transport traits
25
26/// Transforms packets to make tailscale happen.
27pub struct DataPlane {
28 core_state: Mutex<CoreState>,
29 poll_state: Mutex<PollState>,
30
31 transports_changed: tokio::sync::Notify,
32
33 underlay_down: DataplaneToUnderlay,
34 overlay_up: DataplaneToOverlay,
35
36 next_underlay_transport: AtomicU32,
37 next_overlay_transport: AtomicU32,
38}
39
40struct CoreState {
41 /// The synchronous core of the data plane.
42 sync: crate::DataPlane,
43
44 /// Queues to write packets to overlay transports.
45 overlay_transports: HashMap<OverlayTransportId, DataplaneToOverlay>,
46 /// Queues to write packets to underlay transports.
47 underlay_transports: HashMap<UnderlayTransportId, DataplaneToUnderlay>,
48}
49
50/// State that must be held during async polling.
51struct PollState {
52 /// Queue for packets entering the data plane ("coming down") from overlay transports.
53 from_overlay: DataplaneFromOverlay,
54 /// Queue for packets entering the data plane ("coming up") from underlay transports.
55 from_underlay: DataplaneFromUnderlay,
56}
57
58impl DataPlane {
59 /// Create a new data plane for a wireguard node key.
60 ///
61 /// The caller must configure overlay/underlay output queues for the data plane to be useful,
62 /// otherwise all it can do is drop packets.
63 pub fn new(my_key: NodeKeyPair) -> Self {
64 let (overlay_up, overlay_down) = mpsc::unbounded_channel();
65 let (underlay_down, underlay_up) = mpsc::unbounded_channel();
66
67 let sync = crate::DataPlane::new(my_key);
68
69 Self {
70 underlay_down,
71 overlay_up,
72
73 next_overlay_transport: Default::default(),
74 next_underlay_transport: Default::default(),
75
76 transports_changed: tokio::sync::Notify::new(),
77
78 core_state: Mutex::new(CoreState {
79 sync,
80 overlay_transports: Default::default(),
81 underlay_transports: Default::default(),
82 }),
83
84 poll_state: Mutex::new(PollState {
85 from_overlay: overlay_down,
86 from_underlay: underlay_up,
87 }),
88 }
89 }
90
91 /// Allocate a new underlay transport.
92 pub async fn new_underlay_transport(
93 &self,
94 ) -> (
95 UnderlayTransportId,
96 DataplaneFromUnderlay,
97 DataplaneToUnderlay,
98 ) {
99 let id = self
100 .next_underlay_transport
101 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
102 .into();
103
104 let (tx, rx) = mpsc::unbounded_channel();
105
106 {
107 let mut rest = self.core_state.lock().await;
108 rest.underlay_transports.insert(id, tx);
109 }
110
111 self.transports_changed.notify_waiters();
112
113 (id, rx, self.underlay_down.clone())
114 }
115
116 /// Allocate a new overlay transport.
117 pub async fn new_overlay_transport(
118 &self,
119 ) -> (OverlayTransportId, DataplaneToOverlay, DataplaneFromOverlay) {
120 let id = self
121 .next_overlay_transport
122 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
123 .into();
124
125 let (tx, rx) = mpsc::unbounded_channel();
126
127 {
128 let mut rest = self.core_state.lock().await;
129 rest.overlay_transports.insert(id, tx);
130 }
131
132 self.transports_changed.notify_waiters();
133
134 (id, self.overlay_up.clone(), rx)
135 }
136
137 /// Run the data plane forever, moving packets from the input queues to output queues.
138 pub async fn run(&self) -> Infallible {
139 loop {
140 self.step().await;
141 }
142 }
143
144 /// Run the data plane for a single step.
145 #[tracing::instrument(skip_all)]
146 pub async fn step(&self) {
147 enum SelectResult {
148 OverlayDown(Vec<PacketMut>),
149 UnderlayUp(PeerId, Vec<PacketMut>),
150 TransportsChanged,
151 Event,
152 }
153
154 // process in two phases:
155 //
156 // - SELECT: wait for underlying i/o or timer to make progress: don't lock the
157 // user-modifiable (core) state. self.transports_changed is used to break out of this
158 // state if the caller changes the underlying transports
159 // - UPDATE: lock the user-modifiable state and actually write out the packets produced
160 // in the SELECT phase (if any)
161 //
162 // designed this way to ensure that users can add and remove transports at any time without
163 // having to wait for the network or a timer to make progress (which may never happen)
164
165 let select_result = {
166 let next_event = {
167 let state = self.core_state.lock().await;
168 state.sync.next_event()
169 };
170
171 let mut poll_state = self.poll_state.lock().await;
172
173 let PollState {
174 from_overlay: overlay_down,
175 from_underlay: underlay_up,
176 ..
177 } = &mut *poll_state;
178
179 tokio::select! {
180 overlay_pkts = overlay_down.recv() => {
181 let overlay_pkts = overlay_pkts.unwrap();
182 tracing::trace!(n_overlay_pkts = overlay_pkts.len());
183
184 SelectResult::OverlayDown(overlay_pkts)
185 }
186
187 underlay_pkts = underlay_up.recv() => {
188 let (peer_id, underlay_pkts) = underlay_pkts.unwrap();
189 tracing::trace!(%peer_id, n_underlay_pkts = underlay_pkts.len());
190
191 SelectResult::UnderlayUp(peer_id, underlay_pkts)
192 }
193
194 _ = self.transports_changed.notified() => {
195 tracing::trace!("transports changed");
196
197 SelectResult::TransportsChanged
198 }
199
200 _ = sleep_until_event(next_event.map(Into::into)) => {
201 tracing::trace!("event");
202
203 SelectResult::Event
204 }
205 }
206 };
207
208 let mut core = self.core_state.lock().await;
209
210 let (to_peers, to_local) = match select_result {
211 SelectResult::OverlayDown(overlay_down) => {
212 let OutboundResult { to_peers, loopback } =
213 core.sync.process_outbound(overlay_down);
214
215 (Some(to_peers), Some(loopback))
216 }
217 SelectResult::UnderlayUp(_peer_id, underlay_up) => {
218 let InboundResult { to_local, to_peers } = core.sync.process_inbound(underlay_up);
219
220 (Some(to_peers), Some(to_local))
221 }
222 SelectResult::Event => {
223 let EventResult { to_peers } = core.sync.process_events();
224 (Some(to_peers), None)
225 }
226 SelectResult::TransportsChanged => (None, None),
227 };
228
229 if let Some(to_peers) = to_peers {
230 write_to_underlay(&core, to_peers).await;
231 }
232
233 if let Some(to_local) = to_local {
234 write_to_overlay(&core, to_local).await;
235 }
236 }
237
238 /// Get a mutable reference to the inner [`crate::DataPlane`].
239 ///
240 /// Primarily intended for mutating the routing tables.
241 ///
242 /// The returned value is a mutex guard, so limit how long it's held.
243 pub async fn inner(&self) -> impl DerefMut<Target = crate::DataPlane> {
244 let core = self.core_state.lock().await;
245 tokio::sync::MutexGuard::map(core, |x| &mut x.sync)
246 }
247}
248
249async fn write_to_overlay(slf: &CoreState, packets: HashMap<OverlayTransportId, Vec<PacketMut>>) {
250 for (id, packets) in packets {
251 if let Some(queue) = slf.overlay_transports.get(&id) {
252 tracing::trace!(overlay_id = ?id, n_packets = packets.len());
253 queue.send(packets).unwrap();
254 }
255 }
256}
257
258async fn write_to_underlay(
259 slf: &CoreState,
260 packets: impl IntoIterator<Item = ((UnderlayTransportId, PeerId), Vec<PacketMut>)>,
261) {
262 for ((tid, peer_id), packets) in packets {
263 tracing::trace!(underlay_id = ?tid, %peer_id, n_packets = packets.len());
264
265 if let Some(queue) = slf.underlay_transports.get(&tid) {
266 queue.send((peer_id, packets)).unwrap();
267 }
268 }
269}
270
271/// The longest the dataplane will sleep waiting for a timer when *no* event is scheduled, before
272/// re-checking the wireguard state machine.
273///
274/// The primary driver of timer progress is a real scheduled event: an endpoint with persistent
275/// keepalive enabled (the default) always reports a next-event deadline via
276/// [`crate::DataPlane::next_event`], so `step` wakes exactly on it and the keepalive / rekey / expiry
277/// timers fire on schedule even on an otherwise idle, fully-relayed tunnel. When such a deadline
278/// exists we sleep all the way to it (it is itself the coalesced *soonest* timer), so an idle tunnel
279/// with a keepalive due in ~25s sleeps ~25s and wakes *once* — not once per second.
280///
281/// This bound is purely a defensive safety net for the *no-event* case: it guarantees the dataplane
282/// can never block *forever* on I/O with nothing scheduled — the wedge where `next_event() == None`
283/// turned the sleep into `future::pending()`, so an idle session aged past expiry with nothing to
284/// refresh it. A spurious wakeup with no due event is harmless (the dispatch finds nothing and writes
285/// nothing); a few-second bound keeps that idle-wakeup overhead negligible (≈17k wakeups/day vs the
286/// old unconditional 1 Hz floor's ≈86k) while still bounding the wedge window.
287const MAX_IDLE_SLEEP: core::time::Duration = core::time::Duration::from_secs(5);
288
289/// Sleep until the next scheduled event deadline; if none is scheduled, sleep at most
290/// [`MAX_IDLE_SLEEP`] rather than blocking forever.
291///
292/// When `deadline` is `Some`, this wakes exactly on it (a finite instant, so it can never block
293/// forever and never wakes later than the deadline). When `deadline` is `None` (no event scheduled)
294/// it sleeps for [`MAX_IDLE_SLEEP`] so the dataplane periodically re-services the wireguard state
295/// machine even with zero traffic and zero scheduled events.
296async fn sleep_until_event(deadline: Option<tokio::time::Instant>) {
297 let until = next_wakeup(deadline, tokio::time::Instant::now(), MAX_IDLE_SLEEP);
298 tokio::time::sleep_until(until).await;
299}
300
301/// Compute the next wakeup instant.
302///
303/// - `Some(deadline)`: wake exactly on the next scheduled event. Real timers (persistent keepalive /
304/// rekey / expiry) are always reported as events, and `next_event` already returns the *soonest*
305/// one, so honoring it directly means an idle tunnel with a keepalive due in 25s sleeps ~25s and
306/// wakes *once*. We deliberately do **not** clamp the deadline down to an idle floor — that would
307/// wake ~25× more often for no benefit, since nothing is due before the deadline. The deadline is
308/// itself a finite instant, so this can never block forever, and we never wake *later* than it.
309/// - `None` (nothing scheduled): collapse to the bounded floor `now + max_idle_sleep` so the result
310/// is *always* a finite instant — the dataplane can never sleep forever even with no events.
311///
312/// Pure so the cadence (and the "never block forever" guarantee) is unit-testable without a runtime.
313fn next_wakeup<I: core::ops::Add<core::time::Duration, Output = I> + Copy>(
314 deadline: Option<I>,
315 now: I,
316 max_idle_sleep: core::time::Duration,
317) -> I {
318 match deadline {
319 Some(deadline) => deadline,
320 None => now + max_idle_sleep,
321 }
322}
323
324#[cfg(test)]
325mod tests {
326 use super::*;
327
328 /// The wedge fix, distilled: with no event scheduled, the dataplane must still wake within the
329 /// bounded floor instead of `future::pending()` (block forever). This is what guarantees an idle
330 /// endpoint's timers (persistent keepalive / rekey / expiry) keep getting serviced.
331 #[test]
332 fn no_scheduled_event_still_wakes_within_floor() {
333 let now = std::time::Instant::now();
334 let woke = next_wakeup(None, now, MAX_IDLE_SLEEP);
335 assert_eq!(
336 woke,
337 now + MAX_IDLE_SLEEP,
338 "a None deadline must collapse to the bounded floor, never block forever"
339 );
340 }
341
342 /// A soon scheduled event is honored exactly: the idle floor only applies when *no* event is
343 /// scheduled, it never delays (or hurries) a due event.
344 #[test]
345 fn near_event_is_honored_exactly() {
346 let now = std::time::Instant::now();
347 let soon = now + core::time::Duration::from_millis(50);
348 assert_eq!(
349 next_wakeup(Some(soon), now, MAX_IDLE_SLEEP),
350 soon,
351 "an event sooner than the floor must wake exactly on its deadline"
352 );
353 }
354
355 /// A far-future scheduled event is honored exactly, *not* clamped down to the idle floor: the
356 /// floor exists only to bound the no-event wedge. `next_event` already reports the soonest timer,
357 /// so nothing is due before the deadline — clamping it would just burn ~floor-cadence wakeups on
358 /// an idle tunnel (the battery regression this fix removes). Sleeping to a far real deadline is
359 /// safe precisely because it is finite (never `pending()`).
360 #[test]
361 fn far_event_is_honored_not_clamped() {
362 let now = std::time::Instant::now();
363 let far = now + core::time::Duration::from_secs(3600);
364 assert_eq!(
365 next_wakeup(Some(far), now, MAX_IDLE_SLEEP),
366 far,
367 "a far-off scheduled event must be honored exactly, not clamped to the idle floor"
368 );
369 }
370
371 /// An idle tunnel with a persistent keepalive due in ~25s must sleep ~25s and wake *once*, not
372 /// once per [`MAX_IDLE_SLEEP`] — this is the battery/wakeup regression the fix targets.
373 #[test]
374 fn keepalive_in_25s_sleeps_to_the_deadline_not_the_floor() {
375 let now = std::time::Instant::now();
376 let keepalive_due = now + core::time::Duration::from_secs(25);
377 let woke = next_wakeup(Some(keepalive_due), now, MAX_IDLE_SLEEP);
378 assert_eq!(
379 woke, keepalive_due,
380 "a 25s keepalive deadline must be slept to directly (one wakeup), not capped at the idle floor"
381 );
382 assert!(
383 woke > now + MAX_IDLE_SLEEP,
384 "the wakeup must be well past the idle floor: the floor must not shorten a real deadline"
385 );
386 }
387
388 /// The anti-busy-spin invariant of the wedge fix, stated as a bound: a fully-idle dataplane (no
389 /// scheduled event) must always wake **strictly in the future**, on the coarse floor — never at
390 /// `now` or earlier (which would make `sleep_until` return instantly and turn `step()` into a
391 /// tight, CPU-burning sub-millisecond loop) and never `future::pending()` (the original
392 /// block-forever wedge). Swept over several base instants against the *real* `MAX_IDLE_SLEEP` so
393 /// the production idle cadence itself is what's pinned, not a toy value.
394 ///
395 /// Scope note: this is the deepest layer testable without a runtime. A full `#[tokio::test]`
396 /// driving [`DataPlane::step`] under `tokio::time::pause()` / `advance()` would require tokio's
397 /// `test-util` feature, which `ts_dataplane` does not enable (turning it on is a non-test
398 /// dependency change, out of scope here). [`sleep_until_event`] is a thin wrapper that feeds this
399 /// exact instant straight to `tokio::time::sleep_until`, so the integration-level idle cadence —
400 /// wake every `MAX_IDLE_SLEEP`, never sooner, never never — is fully determined by (and thus
401 /// covered through) this helper's boundedness.
402 #[test]
403 fn idle_wakeup_is_coarse_and_never_busy_spins() {
404 // A zero floor would let an idle step() spin; the production cadence must be positive.
405 assert!(
406 MAX_IDLE_SLEEP > core::time::Duration::ZERO,
407 "the idle floor must be a positive cadence, else step() would busy-spin"
408 );
409
410 let base = std::time::Instant::now();
411 for offset_ms in [0u64, 1, 250, 5_000, 60_000] {
412 let now = base + core::time::Duration::from_millis(offset_ms);
413 let woke = next_wakeup(None, now, MAX_IDLE_SLEEP);
414
415 // Strictly after `now`: an idle wakeup at or before `now` would busy-spin step().
416 assert!(
417 woke > now,
418 "idle wakeup must be strictly after now (no busy-spin); got {woke:?} <= {now:?}"
419 );
420 // Bounded to exactly the coarse floor: never sooner (tight loop), always finite
421 // (never the old `future::pending()` block-forever).
422 assert_eq!(
423 woke,
424 now + MAX_IDLE_SLEEP,
425 "idle wakeup must land on the bounded coarse floor, never sooner and never never"
426 );
427 }
428 }
429}