Skip to main content

ts_dataplane/
lib.rs

1#![doc = include_str!("../README.md")]
2
3use std::{collections::HashMap, sync::Arc, time::Instant};
4
5use ts_bart::RoutingTable;
6use ts_overlay_router as or;
7use ts_packet::PacketMut;
8use ts_packetfilter::{FilterExt, IpProto};
9use ts_time::{Handle, Scheduler};
10use ts_transport::{OverlayTransportId, PeerId, UnderlayTransportId};
11use ts_tunnel::{Endpoint, NodeKeyPair};
12use ts_underlay_router as ur;
13
14pub mod async_tokio;
15
16/// A data plane subsystem that can be the subject of timer events.
17pub enum Subsystem {
18    /// The wireguard component.
19    Wireguard,
20}
21
22/// The direction/path of a captured packet, mirroring Go Tailscale's `capture.Path`. The numeric
23/// values are the on-wire path codes written into each pcap record's Tailscale preamble.
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum CapturePath {
26    /// A packet from the local device, heading out to a peer (pre-encrypt).
27    FromLocal = 0,
28    /// A packet received from a peer, decrypted, heading to the local device.
29    FromPeer = 1,
30    /// A packet synthesized by us toward the local device. Retained for Go `capture.Path` on-wire
31    /// code parity (so captured pcap path codes match Go's, and a future synthesized-packet tee
32    /// point can emit it); not currently emitted — the tee only produces `FromLocal`/`FromPeer`.
33    SynthesizedToLocal = 2,
34    /// A packet synthesized by us toward a peer. Retained for Go `capture.Path` on-wire code parity
35    /// (see [`Self::SynthesizedToLocal`]); not currently emitted.
36    SynthesizedToPeer = 3,
37}
38
39impl CapturePath {
40    /// The on-wire path code (the `uint16` written into the pcap record preamble).
41    pub fn code(self) -> u16 {
42        self as u16
43    }
44}
45
46/// A debug packet-capture hook. When installed on a [`DataPlane`], it is invoked with the path and
47/// the raw IP packet bytes for every plaintext packet crossing the datapath. It must be cheap and
48/// non-blocking — it runs inline on the single-threaded dataplane step, so a slow hook backs up the
49/// datapath. Wrapped in `Arc` so it is cheap to clone and `Send + Sync` for the actor that installs
50/// it.
51pub type CaptureHook = std::sync::Arc<dyn Fn(CapturePath, &[u8]) + Send + Sync>;
52
53/// Transforms packets to make tailscale happen.
54pub struct DataPlane {
55    /// Wireguard encryption/decryption.
56    pub wireguard: Endpoint,
57
58    /// Outbound overlay router.
59    pub or_out: or::outbound::Router,
60    /// Outbound underlay router.
61    pub ur_out: ur::outbound::Router,
62
63    /// Inbound source filter.
64    pub src_filter_in: Arc<ts_bart::Table<PeerId>>,
65    /// Inbound overlay router.
66    pub or_in: or::inbound::Router,
67
68    /// The packet filter.
69    pub packet_filter: Arc<dyn ts_packetfilter::Filter + Send + Sync>,
70
71    /// Events queued for future processing.
72    pub events: Scheduler<Subsystem>,
73
74    /// Next event for the wireguard subsystem.
75    pub wg_next: Option<Handle<Subsystem>>,
76
77    /// Optional debug packet-capture hook (Go `tstun.Wrapper` capture hook). `None` (the default)
78    /// means no capture and zero datapath overhead. Installed/cleared at runtime by the dataplane
79    /// actor; see [`DataPlane::process_outbound`]/[`DataPlane::process_inbound`] for the tee points.
80    pub capture: Option<CaptureHook>,
81}
82
83impl DataPlane {
84    /// Creates a new data plane for a wireguard node key.
85    pub fn new(my_key: NodeKeyPair) -> Self {
86        DataPlane {
87            wireguard: Endpoint::new(my_key),
88            or_out: Default::default(),
89            ur_out: Default::default(),
90            src_filter_in: Default::default(),
91            or_in: Default::default(),
92            events: Default::default(),
93            packet_filter: Arc::new(ts_packetfilter::DropAllFilter),
94            wg_next: None,
95            capture: None,
96        }
97    }
98
99    /// Processes packets originating from the local device.
100    #[tracing::instrument(skip_all, fields(n_packets = packets.len()))]
101    pub fn process_outbound(&mut self, packets: Vec<PacketMut>) -> OutboundResult {
102        if let Some(hook) = &self.capture {
103            for p in &packets {
104                hook(CapturePath::FromLocal, p.as_ref());
105            }
106        }
107
108        let or::outbound::Result {
109            to_wireguard,
110            loopback,
111        } = self.or_out.route(packets);
112
113        let to_wireguard = to_wireguard
114            .into_iter()
115            .map(|(k, v)| (ts_tunnel::PeerId(k.0), v))
116            .collect::<Vec<_>>();
117
118        let ts_tunnel::SendResult {
119            to_peers: encrypted,
120        } = self.wireguard.send(to_wireguard);
121
122        let to_peers = self
123            .ur_out
124            .route(encrypted.into_iter().map(|(k, v)| (PeerId(k.0), v)));
125
126        if let Some(next) = self.wireguard.next_event()
127            && let Some(prev) = self
128                .wg_next
129                .replace(self.events.add(next, Subsystem::Wireguard))
130        {
131            prev.cancel();
132        }
133
134        OutboundResult { to_peers, loopback }
135    }
136
137    /// Processes packets received from elsewhere.
138    pub fn process_inbound(
139        &mut self,
140        packets: impl IntoIterator<Item = PacketMut>,
141    ) -> InboundResult {
142        let ts_tunnel::RecvResult { to_local, to_peers } = self.wireguard.recv(packets);
143
144        if let Some(hook) = &self.capture {
145            for packets in to_local.values() {
146                for p in packets {
147                    hook(CapturePath::FromPeer, p.as_ref());
148                }
149            }
150        }
151
152        let to_local = to_local
153            .into_iter()
154            .map(|(peer_id, mut packets)| -> Vec<PacketMut> {
155                let _span = tracing::trace_span!(
156                    "src_filter_inbound",
157                    peer_id = ?peer_id,
158                    n_packet = packets.len(),
159                )
160                .entered();
161
162                packets.retain(|packet| {
163                    let Some(src) = packet.get_src_addr() else {
164                        tracing::trace!("does not look like ip packet");
165                        return false;
166                    };
167                    let verdict = if let Some(allowed_peer) = self.src_filter_in.lookup(src) {
168                        *allowed_peer == PeerId(peer_id.0)
169                    } else {
170                        tracing::trace!(remote_ip = %src, "unknown peer address");
171                        false
172                    };
173                    tracing::trace!(?src, verdict);
174                    verdict
175                });
176
177                packets
178            })
179            .map(|mut v| {
180                let _span =
181                    tracing::trace_span!("packet_filter_inbound", n_packet = v.len()).entered();
182
183                v.retain(|pkt| {
184                    let Ok(pkt) = etherparse::SlicedPacket::from_ip(pkt.as_ref()) else {
185                        tracing::trace!("does not look like ip packet");
186                        return false;
187                    };
188
189                    let (proto, src, dst) = match pkt.net {
190                        Some(etherparse::NetSlice::Ipv4(ipv4)) => (
191                            IpProto::new(ipv4.payload().ip_number.0 as _),
192                            ipv4.header().source_addr().into(),
193                            ipv4.header().destination_addr().into(),
194                        ),
195                        Some(etherparse::NetSlice::Ipv6(ipv6)) => (
196                            IpProto::new(ipv6.payload().ip_number.0 as _),
197                            ipv6.header().source_addr().into(),
198                            ipv6.header().destination_addr().into(),
199                        ),
200                        _ => {
201                            // A packet that parsed as IP but is neither IPv4 nor IPv6 (e.g. a
202                            // future/odd `NetSlice` shape). These bytes are attacker-controlled
203                            // post-decrypt, so fail closed — drop it — rather than `unreachable!`,
204                            // which would panic the single-threaded dataplane on a crafted packet.
205                            // Go's filter `pre()` likewise returns Drop/"not-ip" here, never panics.
206                            tracing::trace!("parsed packet is neither IPv4 nor IPv6; dropping");
207                            return false;
208                        }
209                    };
210
211                    let (_src_port, dst_port) = match pkt.transport {
212                        Some(etherparse::TransportSlice::Udp(udp)) => {
213                            (udp.source_port(), udp.destination_port())
214                        }
215                        Some(etherparse::TransportSlice::Tcp(tcp)) => {
216                            (tcp.source_port(), tcp.destination_port())
217                        }
218                        _ => (0, 0),
219                    };
220
221                    let info = ts_packetfilter::PacketInfo {
222                        ip_proto: proto,
223                        port: dst_port,
224                        src,
225                        dst,
226                    };
227
228                    // TODO(npry): wire in nodecaps
229                    let caps = [];
230                    let verdict = self.packet_filter.can_access(&info, caps);
231
232                    tracing::trace!(?info, ?caps, verdict);
233
234                    verdict
235                });
236
237                v
238            });
239
240        let to_peers = to_peers
241            .into_iter()
242            .map(|(k, v)| (ts_transport::PeerId(k.0), v));
243
244        let to_local = self.or_in.route(to_local.flatten());
245        let to_peers = self.ur_out.route(to_peers);
246
247        if let Some(next) = self.wireguard.next_event()
248            && let Some(prev) = self
249                .wg_next
250                .replace(self.events.add(next, Subsystem::Wireguard))
251        {
252            prev.cancel();
253        }
254
255        InboundResult { to_local, to_peers }
256    }
257
258    /// Return the next time at which [`DataPlane::process_events`] must be called.
259    ///
260    /// [`DataPlane::process_outbound`], [`DataPlane::process_inbound`] and
261    /// [`DataPlane::process_events`] may all update the next event time. Callers should prefer
262    /// calling `next_event` as needed to get a correct result, rather than store the returned
263    /// value.
264    pub fn next_event(&self) -> Option<Instant> {
265        self.events.next_dispatch()
266    }
267
268    /// Process all queued events that are due for processing.
269    ///
270    /// Must be called at least as often as dictated by [`DataPlane::next_event`] for the
271    /// data plane to function correctly. It is harmless to call it more frequently.
272    pub fn process_events(&mut self) -> EventResult {
273        let mut to_peers = HashMap::new();
274        let now = Instant::now();
275        for event in self.events.dispatch(now) {
276            match event {
277                Subsystem::Wireguard => {
278                    let res = self.wireguard.dispatch_events(now);
279                    to_peers.extend(
280                        res.to_peers
281                            .into_iter()
282                            .map(|(id, pkts)| (ts_transport::PeerId(id.0), pkts)),
283                    );
284                }
285            }
286        }
287        let to_peers = self.ur_out.route(to_peers);
288
289        if let Some(next) = self.wireguard.next_event()
290            && let Some(prev) = self
291                .wg_next
292                .replace(self.events.add(next, Subsystem::Wireguard))
293        {
294            prev.cancel();
295        }
296
297        EventResult { to_peers }
298    }
299}
300
301/// The result of processing outbound packets.
302pub struct OutboundResult {
303    /// Packets to be sent into underlay transports for transmission.
304    pub to_peers: HashMap<(UnderlayTransportId, PeerId), Vec<PacketMut>>,
305    /// Packets to be looped back and delivered to overlay transports.
306    pub loopback: HashMap<OverlayTransportId, Vec<PacketMut>>,
307}
308
309/// The result of processing inbound packets.
310pub struct InboundResult {
311    /// Decrypted packets to be delivered to overlay transports.
312    pub to_local: HashMap<OverlayTransportId, Vec<PacketMut>>,
313    /// Encrypted packets to be sent to wireguard peers by the underlay.
314    pub to_peers: HashMap<(UnderlayTransportId, PeerId), Vec<PacketMut>>,
315}
316
317/// The result of processing an event.
318#[derive(Default)]
319pub struct EventResult {
320    /// Encrypted packets to be sent to wireguard peers by the underlay.
321    pub to_peers: HashMap<(UnderlayTransportId, PeerId), Vec<PacketMut>>,
322}
323
324#[cfg(test)]
325mod tests {
326    use std::sync::Mutex;
327
328    use super::*;
329
330    /// Records `(path, bytes)` for each capture-hook invocation in a test.
331    type CaptureLog = Arc<Mutex<Vec<(CapturePath, Vec<u8>)>>>;
332
333    #[test]
334    fn capture_path_codes() {
335        assert_eq!(CapturePath::FromLocal.code(), 0);
336        assert_eq!(CapturePath::FromPeer.code(), 1);
337        assert_eq!(CapturePath::SynthesizedToLocal.code(), 2);
338        assert_eq!(CapturePath::SynthesizedToPeer.code(), 3);
339    }
340
341    /// Behavioral guard: an installed capture hook MUST be invoked with `CapturePath::FromLocal`
342    /// and the exact packet bytes for every outbound packet. The tee sits at the top of
343    /// `process_outbound`, before `or_out.route` consumes the packets, so it fires regardless of
344    /// whether a wireguard peer exists (an empty router just drops the routed packets afterward).
345    /// This is the only end-to-end guard that the dataplane capture tee actually fires; a refactor
346    /// that drops the tee would leave every byte-layout test green.
347    #[test]
348    fn capture_hook_fires_on_outbound() {
349        let mut dp = DataPlane::new(NodeKeyPair::new());
350
351        let recorded: CaptureLog = Arc::new(Mutex::new(Vec::new()));
352        let sink = recorded.clone();
353        dp.capture = Some(Arc::new(move |path: CapturePath, bytes: &[u8]| {
354            sink.lock().unwrap().push((path, bytes.to_vec()));
355        }));
356
357        // The outbound tee passes `p.as_ref()` as-given; the bytes need not be a valid IP packet.
358        let payload: Vec<u8> = vec![0xde, 0xad, 0xbe, 0xef];
359        let packet = PacketMut::from(payload.clone());
360
361        drop(dp.process_outbound(vec![packet]));
362
363        let captured = recorded.lock().unwrap();
364        assert_eq!(captured.len(), 1, "hook must fire exactly once per packet");
365        assert_eq!(captured[0].0, CapturePath::FromLocal);
366        assert_eq!(captured[0].1, payload);
367    }
368}