Skip to main content

ts_dataplane/
async_tokio.rs

1//! The packet processing dataplane, as a tokio task.
2
3use std::{collections::HashMap, convert::Infallible, ops::DerefMut, sync::atomic::AtomicU32};
4
5use tokio::sync::{Mutex, mpsc};
6use ts_packet::PacketMut;
7use ts_transport::{OverlayTransportId, PeerId, UnderlayTransportId};
8use ts_tunnel::NodeKeyPair;
9
10use crate::{EventResult, InboundResult, OutboundResult};
11
12/// Queue for packets leaving the data plane "up" into an overlay transport.
13pub type DataplaneToOverlay = mpsc::UnboundedSender<Vec<PacketMut>>;
14
15/// Queue for packets entering the data plane "down" from an overlay transport.
16pub type DataplaneFromOverlay = mpsc::UnboundedReceiver<Vec<PacketMut>>;
17
18/// Queue for packets leaving the data plane "down" into an underlay transport.
19pub type DataplaneToUnderlay = mpsc::UnboundedSender<(PeerId, Vec<PacketMut>)>;
20
21/// Queue for packets entering the data plane "up" from an underlay transport.
22pub type DataplaneFromUnderlay = mpsc::UnboundedReceiver<(PeerId, Vec<PacketMut>)>;
23
24// TODO: wire in overlay/underlay transport traits
25
26/// Transforms packets to make tailscale happen.
27pub struct DataPlane {
28    core_state: Mutex<CoreState>,
29    poll_state: Mutex<PollState>,
30
31    transports_changed: tokio::sync::Notify,
32
33    underlay_down: DataplaneToUnderlay,
34    overlay_up: DataplaneToOverlay,
35
36    next_underlay_transport: AtomicU32,
37    next_overlay_transport: AtomicU32,
38}
39
40struct CoreState {
41    /// The synchronous core of the data plane.
42    sync: crate::DataPlane,
43
44    /// Queues to write packets to overlay transports.
45    overlay_transports: HashMap<OverlayTransportId, DataplaneToOverlay>,
46    /// Queues to write packets to underlay transports.
47    underlay_transports: HashMap<UnderlayTransportId, DataplaneToUnderlay>,
48}
49
50/// State that must be held during async polling.
51struct PollState {
52    /// Queue for packets entering the data plane ("coming down") from overlay transports.
53    from_overlay: DataplaneFromOverlay,
54    /// Queue for packets entering the data plane ("coming up") from underlay transports.
55    from_underlay: DataplaneFromUnderlay,
56}
57
58impl DataPlane {
59    /// Create a new data plane for a wireguard node key.
60    ///
61    /// The caller must configure overlay/underlay output queues for the data plane to be useful,
62    /// otherwise all it can do is drop packets.
63    pub fn new(my_key: NodeKeyPair) -> Self {
64        let (overlay_up, overlay_down) = mpsc::unbounded_channel();
65        let (underlay_down, underlay_up) = mpsc::unbounded_channel();
66
67        let sync = crate::DataPlane::new(my_key);
68
69        Self {
70            underlay_down,
71            overlay_up,
72
73            next_overlay_transport: Default::default(),
74            next_underlay_transport: Default::default(),
75
76            transports_changed: tokio::sync::Notify::new(),
77
78            core_state: Mutex::new(CoreState {
79                sync,
80                overlay_transports: Default::default(),
81                underlay_transports: Default::default(),
82            }),
83
84            poll_state: Mutex::new(PollState {
85                from_overlay: overlay_down,
86                from_underlay: underlay_up,
87            }),
88        }
89    }
90
91    /// Allocate a new underlay transport.
92    pub async fn new_underlay_transport(
93        &self,
94    ) -> (
95        UnderlayTransportId,
96        DataplaneFromUnderlay,
97        DataplaneToUnderlay,
98    ) {
99        let id = self
100            .next_underlay_transport
101            .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
102            .into();
103
104        let (tx, rx) = mpsc::unbounded_channel();
105
106        {
107            let mut rest = self.core_state.lock().await;
108            rest.underlay_transports.insert(id, tx);
109        }
110
111        self.transports_changed.notify_waiters();
112
113        (id, rx, self.underlay_down.clone())
114    }
115
116    /// Allocate a new overlay transport.
117    pub async fn new_overlay_transport(
118        &self,
119    ) -> (OverlayTransportId, DataplaneToOverlay, DataplaneFromOverlay) {
120        let id = self
121            .next_overlay_transport
122            .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
123            .into();
124
125        let (tx, rx) = mpsc::unbounded_channel();
126
127        {
128            let mut rest = self.core_state.lock().await;
129            rest.overlay_transports.insert(id, tx);
130        }
131
132        self.transports_changed.notify_waiters();
133
134        (id, self.overlay_up.clone(), rx)
135    }
136
137    /// Run the data plane forever, moving packets from the input queues to output queues.
138    pub async fn run(&self) -> Infallible {
139        loop {
140            self.step().await;
141        }
142    }
143
144    /// Run the data plane for a single step.
145    #[tracing::instrument(skip_all)]
146    pub async fn step(&self) {
147        enum SelectResult {
148            OverlayDown(Vec<PacketMut>),
149            UnderlayUp(PeerId, Vec<PacketMut>),
150            TransportsChanged,
151            Event,
152        }
153
154        // process in two phases:
155        //
156        // - SELECT: wait for underlying i/o or timer to make progress: don't lock the
157        //      user-modifiable (core) state. self.transports_changed is used to break out of this
158        //      state if the caller changes the underlying transports
159        // - UPDATE: lock the user-modifiable state and actually write out the packets produced
160        //      in the SELECT phase (if any)
161        //
162        // designed this way to ensure that users can add and remove transports at any time without
163        // having to wait for the network or a timer to make progress (which may never happen)
164
165        let select_result = {
166            let next_event = {
167                let state = self.core_state.lock().await;
168                state.sync.next_event()
169            };
170
171            let mut poll_state = self.poll_state.lock().await;
172
173            let PollState {
174                from_overlay: overlay_down,
175                from_underlay: underlay_up,
176                ..
177            } = &mut *poll_state;
178
179            tokio::select! {
180                overlay_pkts = overlay_down.recv() => {
181                    let overlay_pkts = overlay_pkts.unwrap();
182                    tracing::trace!(n_overlay_pkts = overlay_pkts.len());
183
184                    SelectResult::OverlayDown(overlay_pkts)
185                }
186
187                underlay_pkts = underlay_up.recv() => {
188                    let (peer_id, underlay_pkts) = underlay_pkts.unwrap();
189                    tracing::trace!(%peer_id, n_underlay_pkts = underlay_pkts.len());
190
191                    SelectResult::UnderlayUp(peer_id, underlay_pkts)
192                }
193
194                _ = self.transports_changed.notified() => {
195                    tracing::trace!("transports changed");
196
197                    SelectResult::TransportsChanged
198                }
199
200                _ = sleep_until_event(next_event.map(Into::into)) => {
201                    tracing::trace!("event");
202
203                    SelectResult::Event
204                }
205            }
206        };
207
208        let mut core = self.core_state.lock().await;
209
210        let (to_peers, to_local) = match select_result {
211            SelectResult::OverlayDown(overlay_down) => {
212                let OutboundResult { to_peers, loopback } =
213                    core.sync.process_outbound(overlay_down);
214
215                (Some(to_peers), Some(loopback))
216            }
217            SelectResult::UnderlayUp(_peer_id, underlay_up) => {
218                let InboundResult { to_local, to_peers } = core.sync.process_inbound(underlay_up);
219
220                (Some(to_peers), Some(to_local))
221            }
222            SelectResult::Event => {
223                let EventResult { to_peers } = core.sync.process_events();
224                (Some(to_peers), None)
225            }
226            SelectResult::TransportsChanged => (None, None),
227        };
228
229        if let Some(to_peers) = to_peers {
230            write_to_underlay(&core, to_peers).await;
231        }
232
233        if let Some(to_local) = to_local {
234            write_to_overlay(&core, to_local).await;
235        }
236    }
237
238    /// Get a mutable reference to the inner [`crate::DataPlane`].
239    ///
240    /// Primarily intended for mutating the routing tables.
241    ///
242    /// The returned value is a mutex guard, so limit how long it's held.
243    pub async fn inner(&self) -> impl DerefMut<Target = crate::DataPlane> {
244        let core = self.core_state.lock().await;
245        tokio::sync::MutexGuard::map(core, |x| &mut x.sync)
246    }
247}
248
249async fn write_to_overlay(slf: &CoreState, packets: HashMap<OverlayTransportId, Vec<PacketMut>>) {
250    for (id, packets) in packets {
251        if let Some(queue) = slf.overlay_transports.get(&id) {
252            tracing::trace!(overlay_id = ?id, n_packets = packets.len());
253            queue.send(packets).unwrap();
254        }
255    }
256}
257
258async fn write_to_underlay(
259    slf: &CoreState,
260    packets: impl IntoIterator<Item = ((UnderlayTransportId, PeerId), Vec<PacketMut>)>,
261) {
262    for ((tid, peer_id), packets) in packets {
263        tracing::trace!(underlay_id = ?tid, %peer_id, n_packets = packets.len());
264
265        if let Some(queue) = slf.underlay_transports.get(&tid) {
266            queue.send((peer_id, packets)).unwrap();
267        }
268    }
269}
270
271/// The longest the dataplane will sleep waiting for a timer when *no* event is scheduled, before
272/// re-checking the wireguard state machine.
273///
274/// The primary driver of timer progress is a real scheduled event: an endpoint with persistent
275/// keepalive enabled (the default) always reports a next-event deadline via
276/// [`crate::DataPlane::next_event`], so `step` wakes exactly on it and the keepalive / rekey / expiry
277/// timers fire on schedule even on an otherwise idle, fully-relayed tunnel. When such a deadline
278/// exists we sleep all the way to it (it is itself the coalesced *soonest* timer), so an idle tunnel
279/// with a keepalive due in ~25s sleeps ~25s and wakes *once* — not once per second.
280///
281/// This bound is purely a defensive safety net for the *no-event* case: it guarantees the dataplane
282/// can never block *forever* on I/O with nothing scheduled — the wedge where `next_event() == None`
283/// turned the sleep into `future::pending()`, so an idle session aged past expiry with nothing to
284/// refresh it. A spurious wakeup with no due event is harmless (the dispatch finds nothing and writes
285/// nothing); a few-second bound keeps that idle-wakeup overhead negligible (≈17k wakeups/day vs the
286/// old unconditional 1 Hz floor's ≈86k) while still bounding the wedge window.
287const MAX_IDLE_SLEEP: core::time::Duration = core::time::Duration::from_secs(5);
288
289/// Sleep until the next scheduled event deadline; if none is scheduled, sleep at most
290/// [`MAX_IDLE_SLEEP`] rather than blocking forever.
291///
292/// When `deadline` is `Some`, this wakes exactly on it (a finite instant, so it can never block
293/// forever and never wakes later than the deadline). When `deadline` is `None` (no event scheduled)
294/// it sleeps for [`MAX_IDLE_SLEEP`] so the dataplane periodically re-services the wireguard state
295/// machine even with zero traffic and zero scheduled events.
296async fn sleep_until_event(deadline: Option<tokio::time::Instant>) {
297    let until = next_wakeup(deadline, tokio::time::Instant::now(), MAX_IDLE_SLEEP);
298    tokio::time::sleep_until(until).await;
299}
300
301/// Compute the next wakeup instant.
302///
303/// - `Some(deadline)`: wake exactly on the next scheduled event. Real timers (persistent keepalive /
304///   rekey / expiry) are always reported as events, and `next_event` already returns the *soonest*
305///   one, so honoring it directly means an idle tunnel with a keepalive due in 25s sleeps ~25s and
306///   wakes *once*. We deliberately do **not** clamp the deadline down to an idle floor — that would
307///   wake ~25× more often for no benefit, since nothing is due before the deadline. The deadline is
308///   itself a finite instant, so this can never block forever, and we never wake *later* than it.
309/// - `None` (nothing scheduled): collapse to the bounded floor `now + max_idle_sleep` so the result
310///   is *always* a finite instant — the dataplane can never sleep forever even with no events.
311///
312/// Pure so the cadence (and the "never block forever" guarantee) is unit-testable without a runtime.
313fn next_wakeup<I: core::ops::Add<core::time::Duration, Output = I> + Copy>(
314    deadline: Option<I>,
315    now: I,
316    max_idle_sleep: core::time::Duration,
317) -> I {
318    match deadline {
319        Some(deadline) => deadline,
320        None => now + max_idle_sleep,
321    }
322}
323
324#[cfg(test)]
325mod tests {
326    use super::*;
327
328    /// The wedge fix, distilled: with no event scheduled, the dataplane must still wake within the
329    /// bounded floor instead of `future::pending()` (block forever). This is what guarantees an idle
330    /// endpoint's timers (persistent keepalive / rekey / expiry) keep getting serviced.
331    #[test]
332    fn no_scheduled_event_still_wakes_within_floor() {
333        let now = std::time::Instant::now();
334        let woke = next_wakeup(None, now, MAX_IDLE_SLEEP);
335        assert_eq!(
336            woke,
337            now + MAX_IDLE_SLEEP,
338            "a None deadline must collapse to the bounded floor, never block forever"
339        );
340    }
341
342    /// A soon scheduled event is honored exactly: the idle floor only applies when *no* event is
343    /// scheduled, it never delays (or hurries) a due event.
344    #[test]
345    fn near_event_is_honored_exactly() {
346        let now = std::time::Instant::now();
347        let soon = now + core::time::Duration::from_millis(50);
348        assert_eq!(
349            next_wakeup(Some(soon), now, MAX_IDLE_SLEEP),
350            soon,
351            "an event sooner than the floor must wake exactly on its deadline"
352        );
353    }
354
355    /// A far-future scheduled event is honored exactly, *not* clamped down to the idle floor: the
356    /// floor exists only to bound the no-event wedge. `next_event` already reports the soonest timer,
357    /// so nothing is due before the deadline — clamping it would just burn ~floor-cadence wakeups on
358    /// an idle tunnel (the battery regression this fix removes). Sleeping to a far real deadline is
359    /// safe precisely because it is finite (never `pending()`).
360    #[test]
361    fn far_event_is_honored_not_clamped() {
362        let now = std::time::Instant::now();
363        let far = now + core::time::Duration::from_secs(3600);
364        assert_eq!(
365            next_wakeup(Some(far), now, MAX_IDLE_SLEEP),
366            far,
367            "a far-off scheduled event must be honored exactly, not clamped to the idle floor"
368        );
369    }
370
371    /// An idle tunnel with a persistent keepalive due in ~25s must sleep ~25s and wake *once*, not
372    /// once per [`MAX_IDLE_SLEEP`] — this is the battery/wakeup regression the fix targets.
373    #[test]
374    fn keepalive_in_25s_sleeps_to_the_deadline_not_the_floor() {
375        let now = std::time::Instant::now();
376        let keepalive_due = now + core::time::Duration::from_secs(25);
377        let woke = next_wakeup(Some(keepalive_due), now, MAX_IDLE_SLEEP);
378        assert_eq!(
379            woke, keepalive_due,
380            "a 25s keepalive deadline must be slept to directly (one wakeup), not capped at the idle floor"
381        );
382        assert!(
383            woke > now + MAX_IDLE_SLEEP,
384            "the wakeup must be well past the idle floor: the floor must not shorten a real deadline"
385        );
386    }
387
388    /// The anti-busy-spin invariant of the wedge fix, stated as a bound: a fully-idle dataplane (no
389    /// scheduled event) must always wake **strictly in the future**, on the coarse floor — never at
390    /// `now` or earlier (which would make `sleep_until` return instantly and turn `step()` into a
391    /// tight, CPU-burning sub-millisecond loop) and never `future::pending()` (the original
392    /// block-forever wedge). Swept over several base instants against the *real* `MAX_IDLE_SLEEP` so
393    /// the production idle cadence itself is what's pinned, not a toy value.
394    ///
395    /// Scope note: this is the deepest layer testable without a runtime. A full `#[tokio::test]`
396    /// driving [`DataPlane::step`] under `tokio::time::pause()` / `advance()` would require tokio's
397    /// `test-util` feature, which `ts_dataplane` does not enable (turning it on is a non-test
398    /// dependency change, out of scope here). [`sleep_until_event`] is a thin wrapper that feeds this
399    /// exact instant straight to `tokio::time::sleep_until`, so the integration-level idle cadence —
400    /// wake every `MAX_IDLE_SLEEP`, never sooner, never never — is fully determined by (and thus
401    /// covered through) this helper's boundedness.
402    #[test]
403    fn idle_wakeup_is_coarse_and_never_busy_spins() {
404        // A zero floor would let an idle step() spin; the production cadence must be positive.
405        assert!(
406            MAX_IDLE_SLEEP > core::time::Duration::ZERO,
407            "the idle floor must be a positive cadence, else step() would busy-spin"
408        );
409
410        let base = std::time::Instant::now();
411        for offset_ms in [0u64, 1, 250, 5_000, 60_000] {
412            let now = base + core::time::Duration::from_millis(offset_ms);
413            let woke = next_wakeup(None, now, MAX_IDLE_SLEEP);
414
415            // Strictly after `now`: an idle wakeup at or before `now` would busy-spin step().
416            assert!(
417                woke > now,
418                "idle wakeup must be strictly after now (no busy-spin); got {woke:?} <= {now:?}"
419            );
420            // Bounded to exactly the coarse floor: never sooner (tight loop), always finite
421            // (never the old `future::pending()` block-forever).
422            assert_eq!(
423                woke,
424                now + MAX_IDLE_SLEEP,
425                "idle wakeup must land on the bounded coarse floor, never sooner and never never"
426            );
427        }
428    }
429}