Skip to main content

ts_dataplane/
async_tokio.rs

1//! The packet processing dataplane, as a tokio task.
2
3use std::{collections::HashMap, convert::Infallible, ops::DerefMut, sync::atomic::AtomicU32};
4
5use tokio::sync::{Mutex, mpsc};
6use ts_packet::PacketMut;
7use ts_transport::{OverlayTransportId, PeerId, UnderlayTransportId};
8use ts_tunnel::NodeKeyPair;
9
10use crate::{EventResult, InboundResult, OutboundResult};
11
12/// Queue for packets leaving the data plane "up" into an overlay transport.
13pub type DataplaneToOverlay = mpsc::UnboundedSender<Vec<PacketMut>>;
14
15/// Queue for packets entering the data plane "down" from an overlay transport.
16pub type DataplaneFromOverlay = mpsc::UnboundedReceiver<Vec<PacketMut>>;
17
18/// Queue for packets leaving the data plane "down" into an underlay transport.
19pub type DataplaneToUnderlay = mpsc::UnboundedSender<(PeerId, Vec<PacketMut>)>;
20
21/// Queue for packets entering the data plane "up" from an underlay transport.
22pub type DataplaneFromUnderlay = mpsc::UnboundedReceiver<(PeerId, Vec<PacketMut>)>;
23
24// TODO: wire in overlay/underlay transport traits
25
26/// Transforms packets to make tailscale happen.
27pub struct DataPlane {
28    core_state: Mutex<CoreState>,
29    poll_state: Mutex<PollState>,
30
31    transports_changed: tokio::sync::Notify,
32
33    underlay_down: DataplaneToUnderlay,
34    overlay_up: DataplaneToOverlay,
35
36    next_underlay_transport: AtomicU32,
37    next_overlay_transport: AtomicU32,
38}
39
40struct CoreState {
41    /// The synchronous core of the data plane.
42    sync: crate::DataPlane,
43
44    /// Queues to write packets to overlay transports.
45    overlay_transports: HashMap<OverlayTransportId, DataplaneToOverlay>,
46    /// Queues to write packets to underlay transports.
47    underlay_transports: HashMap<UnderlayTransportId, DataplaneToUnderlay>,
48}
49
50/// State that must be held during async polling.
51struct PollState {
52    /// Queue for packets entering the data plane ("coming down") from overlay transports.
53    from_overlay: DataplaneFromOverlay,
54    /// Queue for packets entering the data plane ("coming up") from underlay transports.
55    from_underlay: DataplaneFromUnderlay,
56}
57
58impl DataPlane {
59    /// Create a new data plane for a wireguard node key.
60    ///
61    /// The caller must configure overlay/underlay output queues for the data plane to be useful,
62    /// otherwise all it can do is drop packets.
63    pub fn new(my_key: NodeKeyPair) -> Self {
64        let (overlay_up, overlay_down) = mpsc::unbounded_channel();
65        let (underlay_down, underlay_up) = mpsc::unbounded_channel();
66
67        let sync = crate::DataPlane::new(my_key);
68
69        Self {
70            underlay_down,
71            overlay_up,
72
73            next_overlay_transport: Default::default(),
74            next_underlay_transport: Default::default(),
75
76            transports_changed: tokio::sync::Notify::new(),
77
78            core_state: Mutex::new(CoreState {
79                sync,
80                overlay_transports: Default::default(),
81                underlay_transports: Default::default(),
82            }),
83
84            poll_state: Mutex::new(PollState {
85                from_overlay: overlay_down,
86                from_underlay: underlay_up,
87            }),
88        }
89    }
90
91    /// Allocate a new underlay transport.
92    pub async fn new_underlay_transport(
93        &self,
94    ) -> (
95        UnderlayTransportId,
96        DataplaneFromUnderlay,
97        DataplaneToUnderlay,
98    ) {
99        let id = self
100            .next_underlay_transport
101            .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
102            .into();
103
104        let (tx, rx) = mpsc::unbounded_channel();
105
106        {
107            let mut rest = self.core_state.lock().await;
108            rest.underlay_transports.insert(id, tx);
109        }
110
111        self.transports_changed.notify_waiters();
112
113        (id, rx, self.underlay_down.clone())
114    }
115
116    /// Allocate a new overlay transport.
117    pub async fn new_overlay_transport(
118        &self,
119    ) -> (OverlayTransportId, DataplaneToOverlay, DataplaneFromOverlay) {
120        let id = self
121            .next_overlay_transport
122            .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
123            .into();
124
125        let (tx, rx) = mpsc::unbounded_channel();
126
127        {
128            let mut rest = self.core_state.lock().await;
129            rest.overlay_transports.insert(id, tx);
130        }
131
132        self.transports_changed.notify_waiters();
133
134        (id, self.overlay_up.clone(), rx)
135    }
136
137    /// Run the data plane forever, moving packets from the input queues to output queues.
138    pub async fn run(&self) -> Infallible {
139        loop {
140            self.step().await;
141        }
142    }
143
144    /// Run the data plane for a single step.
145    #[tracing::instrument(skip_all)]
146    pub async fn step(&self) {
147        enum SelectResult {
148            OverlayDown(Vec<PacketMut>),
149            UnderlayUp(PeerId, Vec<PacketMut>),
150            TransportsChanged,
151            Event,
152        }
153
154        // process in two phases:
155        //
156        // - SELECT: wait for underlying i/o or timer to make progress: don't lock the
157        //      user-modifiable (core) state. self.transports_changed is used to break out of this
158        //      state if the caller changes the underlying transports
159        // - UPDATE: lock the user-modifiable state and actually write out the packets produced
160        //      in the SELECT phase (if any)
161        //
162        // designed this way to ensure that users can add and remove transports at any time without
163        // having to wait for the network or a timer to make progress (which may never happen)
164
165        let select_result = {
166            let next_event = {
167                let state = self.core_state.lock().await;
168                state.sync.next_event()
169            };
170
171            let mut poll_state = self.poll_state.lock().await;
172
173            let PollState {
174                from_overlay: overlay_down,
175                from_underlay: underlay_up,
176                ..
177            } = &mut *poll_state;
178
179            tokio::select! {
180                overlay_pkts = overlay_down.recv() => {
181                    let overlay_pkts = overlay_pkts.unwrap();
182                    tracing::trace!(n_overlay_pkts = overlay_pkts.len());
183
184                    SelectResult::OverlayDown(overlay_pkts)
185                }
186
187                underlay_pkts = underlay_up.recv() => {
188                    let (peer_id, underlay_pkts) = underlay_pkts.unwrap();
189                    tracing::trace!(%peer_id, n_underlay_pkts = underlay_pkts.len());
190
191                    SelectResult::UnderlayUp(peer_id, underlay_pkts)
192                }
193
194                _ = self.transports_changed.notified() => {
195                    tracing::trace!("transports changed");
196
197                    SelectResult::TransportsChanged
198                }
199
200                _ = option_sleep_until(next_event.map(Into::into)) => {
201                    tracing::trace!("event");
202
203                    SelectResult::Event
204                }
205            }
206        };
207
208        let mut core = self.core_state.lock().await;
209
210        let (to_peers, to_local) = match select_result {
211            SelectResult::OverlayDown(overlay_down) => {
212                let OutboundResult { to_peers, loopback } =
213                    core.sync.process_outbound(overlay_down);
214
215                (Some(to_peers), Some(loopback))
216            }
217            SelectResult::UnderlayUp(_peer_id, underlay_up) => {
218                let InboundResult { to_local, to_peers } = core.sync.process_inbound(underlay_up);
219
220                (Some(to_peers), Some(to_local))
221            }
222            SelectResult::Event => {
223                let EventResult { to_peers } = core.sync.process_events();
224                (Some(to_peers), None)
225            }
226            SelectResult::TransportsChanged => (None, None),
227        };
228
229        if let Some(to_peers) = to_peers {
230            write_to_underlay(&core, to_peers).await;
231        }
232
233        if let Some(to_local) = to_local {
234            write_to_overlay(&core, to_local).await;
235        }
236    }
237
238    /// Get a mutable reference to the inner [`crate::DataPlane`].
239    ///
240    /// Primarily intended for mutating the routing tables.
241    ///
242    /// The returned value is a mutex guard, so limit how long it's held.
243    pub async fn inner(&self) -> impl DerefMut<Target = crate::DataPlane> {
244        let core = self.core_state.lock().await;
245        tokio::sync::MutexGuard::map(core, |x| &mut x.sync)
246    }
247}
248
249async fn write_to_overlay(slf: &CoreState, packets: HashMap<OverlayTransportId, Vec<PacketMut>>) {
250    for (id, packets) in packets {
251        if let Some(queue) = slf.overlay_transports.get(&id) {
252            tracing::trace!(overlay_id = ?id, n_packets = packets.len());
253            queue.send(packets).unwrap();
254        }
255    }
256}
257
258async fn write_to_underlay(
259    slf: &CoreState,
260    packets: impl IntoIterator<Item = ((UnderlayTransportId, PeerId), Vec<PacketMut>)>,
261) {
262    for ((tid, peer_id), packets) in packets {
263        tracing::trace!(underlay_id = ?tid, %peer_id, n_packets = packets.len());
264
265        if let Some(queue) = slf.underlay_transports.get(&tid) {
266            queue.send((peer_id, packets)).unwrap();
267        }
268    }
269}
270
271async fn option_sleep_until(deadline: Option<tokio::time::Instant>) {
272    match deadline {
273        Some(deadline) => tokio::time::sleep_until(deadline).await,
274        None => core::future::pending().await,
275    }
276}