Skip to main content

ts_dataplane/
lib.rs

1#![doc = include_str!("../README.md")]
2
3use std::{collections::HashMap, sync::Arc, time::Instant};
4
5use ts_bart::RoutingTable;
6use ts_overlay_router as or;
7use ts_packet::PacketMut;
8use ts_packetfilter::{FilterExt, IpProto};
9use ts_time::{Handle, Scheduler};
10use ts_transport::{OverlayTransportId, PeerId, UnderlayTransportId};
11use ts_tunnel::{Endpoint, NodeKeyPair};
12use ts_underlay_router as ur;
13
14pub mod async_tokio;
15
16/// The single link-local destination Go's filter `pre()` exempts from the link-local drop: the
17/// cloud-metadata address `169.254.169.254` (Go `isAllowedLinkLocal`).
18const ALLOWED_LINK_LOCAL_V4: std::net::Ipv4Addr = std::net::Ipv4Addr::new(169, 254, 169, 254);
19
20/// Whether an inbound packet to destination `dst` must be dropped BEFORE consulting the ACL rules,
21/// mirroring Go's filter `pre()`: drop multicast destinations (`ReasonMulticast`) and link-local
22/// unicast destinations that are not the allowlisted cloud-metadata address (`ReasonLinkLocalUnicast`).
23/// Returning `true` means drop. This runs ahead of `can_access` so a permissive ACL cannot admit the
24/// multicast / link-local traffic Go rejects unconditionally.
25///
26/// Go's `isAllowedLinkLocal` is `dst == gcpDNSAddr || any(LinkLocalAllowHooks)`; only the static
27/// `gcpDNSAddr` arm is modeled here. The dynamic `LinkLocalAllowHooks` slice is empty in a plain
28/// engine/tsnet embedding (its only upstream producer is the GCP metadata path), so the omission is
29/// behaviorally equivalent for this fork; a feature that needs a dynamic link-local allowlist would
30/// have to extend this. Like Go's `netip.Addr` predicates, an IPv4-mapped-IPv6 destination (e.g.
31/// `::ffff:224.0.0.1`) matches NEITHER arm and falls through to the ACL — we deliberately do not
32/// canonicalize/unmap, to stay byte-faithful to Go (see the mapped-v6 test cases).
33fn drop_before_rules(dst: std::net::IpAddr) -> bool {
34    if dst.is_multicast() {
35        return true;
36    }
37    match dst {
38        // IPv4 link-local is 169.254.0.0/16; allow only the cloud-metadata address (Go parity).
39        std::net::IpAddr::V4(v4) => v4.is_link_local() && v4 != ALLOWED_LINK_LOCAL_V4,
40        // IPv6 unicast link-local is fe80::/10. (`Ipv6Addr::is_unicast_link_local` is unstable, so
41        // test the prefix directly.) This fork is IPv4-only by default, but match Go for any v6.
42        std::net::IpAddr::V6(v6) => (v6.segments()[0] & 0xffc0) == 0xfe80,
43    }
44}
45
46/// A data plane subsystem that can be the subject of timer events.
47pub enum Subsystem {
48    /// The wireguard component.
49    Wireguard,
50}
51
52/// The direction/path of a captured packet, mirroring Go Tailscale's `capture.Path`. The numeric
53/// values are the on-wire path codes written into each pcap record's Tailscale preamble.
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55pub enum CapturePath {
56    /// A packet from the local device, heading out to a peer (pre-encrypt).
57    FromLocal = 0,
58    /// A packet received from a peer, decrypted, heading to the local device.
59    FromPeer = 1,
60    /// A packet synthesized by us toward the local device. Retained for Go `capture.Path` on-wire
61    /// code parity (so captured pcap path codes match Go's, and a future synthesized-packet tee
62    /// point can emit it); not currently emitted — the tee only produces `FromLocal`/`FromPeer`.
63    SynthesizedToLocal = 2,
64    /// A packet synthesized by us toward a peer. Retained for Go `capture.Path` on-wire code parity
65    /// (see [`Self::SynthesizedToLocal`]); not currently emitted.
66    SynthesizedToPeer = 3,
67}
68
69impl CapturePath {
70    /// The on-wire path code (the `uint16` written into the pcap record preamble).
71    pub fn code(self) -> u16 {
72        self as u16
73    }
74}
75
76/// A debug packet-capture hook. When installed on a [`DataPlane`], it is invoked with the path and
77/// the raw IP packet bytes for every plaintext packet crossing the datapath. It must be cheap and
78/// non-blocking — it runs inline on the single-threaded dataplane step, so a slow hook backs up the
79/// datapath. Wrapped in `Arc` so it is cheap to clone and `Send + Sync` for the actor that installs
80/// it.
81pub type CaptureHook = std::sync::Arc<dyn Fn(CapturePath, &[u8]) + Send + Sync>;
82
83/// Transforms packets to make tailscale happen.
84pub struct DataPlane {
85    /// Wireguard encryption/decryption.
86    pub wireguard: Endpoint,
87
88    /// Outbound overlay router.
89    pub or_out: or::outbound::Router,
90    /// Outbound underlay router.
91    pub ur_out: ur::outbound::Router,
92
93    /// Inbound source filter.
94    pub src_filter_in: Arc<ts_bart::Table<PeerId>>,
95    /// Inbound overlay router.
96    pub or_in: or::inbound::Router,
97
98    /// The packet filter.
99    pub packet_filter: Arc<dyn ts_packetfilter::Filter + Send + Sync>,
100
101    /// Events queued for future processing.
102    pub events: Scheduler<Subsystem>,
103
104    /// Next event for the wireguard subsystem.
105    pub wg_next: Option<Handle<Subsystem>>,
106
107    /// Optional debug packet-capture hook (Go `tstun.Wrapper` capture hook). `None` (the default)
108    /// means no capture and zero datapath overhead. Installed/cleared at runtime by the dataplane
109    /// actor; see [`DataPlane::process_outbound`]/[`DataPlane::process_inbound`] for the tee points.
110    pub capture: Option<CaptureHook>,
111}
112
113impl DataPlane {
114    /// Creates a new data plane for a wireguard node key.
115    pub fn new(my_key: NodeKeyPair) -> Self {
116        DataPlane {
117            wireguard: Endpoint::new(my_key),
118            or_out: Default::default(),
119            ur_out: Default::default(),
120            src_filter_in: Default::default(),
121            or_in: Default::default(),
122            events: Default::default(),
123            packet_filter: Arc::new(ts_packetfilter::DropAllFilter),
124            wg_next: None,
125            capture: None,
126        }
127    }
128
129    /// Processes packets originating from the local device.
130    #[tracing::instrument(skip_all, fields(n_packets = packets.len()))]
131    pub fn process_outbound(&mut self, packets: Vec<PacketMut>) -> OutboundResult {
132        if let Some(hook) = &self.capture {
133            for p in &packets {
134                hook(CapturePath::FromLocal, p.as_ref());
135            }
136        }
137
138        let or::outbound::Result {
139            to_wireguard,
140            loopback,
141        } = self.or_out.route(packets);
142
143        let to_wireguard = to_wireguard
144            .into_iter()
145            .map(|(k, v)| (ts_tunnel::PeerId(k.0), v))
146            .collect::<Vec<_>>();
147
148        let ts_tunnel::SendResult {
149            to_peers: encrypted,
150        } = self.wireguard.send(to_wireguard);
151
152        let to_peers = self
153            .ur_out
154            .route(encrypted.into_iter().map(|(k, v)| (PeerId(k.0), v)));
155
156        if let Some(next) = self.wireguard.next_event()
157            && let Some(prev) = self
158                .wg_next
159                .replace(self.events.add(next, Subsystem::Wireguard))
160        {
161            prev.cancel();
162        }
163
164        OutboundResult { to_peers, loopback }
165    }
166
167    /// Processes packets received from elsewhere.
168    pub fn process_inbound(
169        &mut self,
170        packets: impl IntoIterator<Item = PacketMut>,
171    ) -> InboundResult {
172        let ts_tunnel::RecvResult { to_local, to_peers } = self.wireguard.recv(packets);
173
174        if let Some(hook) = &self.capture {
175            for packets in to_local.values() {
176                for p in packets {
177                    hook(CapturePath::FromPeer, p.as_ref());
178                }
179            }
180        }
181
182        let to_local = to_local
183            .into_iter()
184            .map(|(peer_id, mut packets)| -> Vec<PacketMut> {
185                let _span = tracing::trace_span!(
186                    "src_filter_inbound",
187                    peer_id = ?peer_id,
188                    n_packet = packets.len(),
189                )
190                .entered();
191
192                packets.retain(|packet| {
193                    let Some(src) = packet.get_src_addr() else {
194                        tracing::trace!("does not look like ip packet");
195                        return false;
196                    };
197                    let verdict = if let Some(allowed_peer) = self.src_filter_in.lookup(src) {
198                        *allowed_peer == PeerId(peer_id.0)
199                    } else {
200                        tracing::trace!(remote_ip = %src, "unknown peer address");
201                        false
202                    };
203                    tracing::trace!(?src, verdict);
204                    verdict
205                });
206
207                packets
208            })
209            .map(|mut v| {
210                let _span =
211                    tracing::trace_span!("packet_filter_inbound", n_packet = v.len()).entered();
212
213                v.retain(|pkt| {
214                    let Ok(pkt) = etherparse::SlicedPacket::from_ip(pkt.as_ref()) else {
215                        tracing::trace!("does not look like ip packet");
216                        return false;
217                    };
218
219                    let (proto, src, dst) = match pkt.net {
220                        Some(etherparse::NetSlice::Ipv4(ipv4)) => (
221                            IpProto::new(ipv4.payload().ip_number.0 as _),
222                            ipv4.header().source_addr().into(),
223                            ipv4.header().destination_addr().into(),
224                        ),
225                        Some(etherparse::NetSlice::Ipv6(ipv6)) => (
226                            IpProto::new(ipv6.payload().ip_number.0 as _),
227                            ipv6.header().source_addr().into(),
228                            ipv6.header().destination_addr().into(),
229                        ),
230                        _ => {
231                            // A packet that parsed as IP but is neither IPv4 nor IPv6 (e.g. a
232                            // future/odd `NetSlice` shape). These bytes are attacker-controlled
233                            // post-decrypt, so fail closed — drop it — rather than `unreachable!`,
234                            // which would panic the single-threaded dataplane on a crafted packet.
235                            // Go's filter `pre()` likewise returns Drop/"not-ip" here, never panics.
236                            tracing::trace!("parsed packet is neither IPv4 nor IPv6; dropping");
237                            return false;
238                        }
239                    };
240
241                    // Pre-rule destination screen, mirroring Go filter `pre()`'s unconditional drops
242                    // that run BEFORE any ACL rule: a packet to a multicast or (non-allowlisted)
243                    // link-local-unicast destination is dropped regardless of the rules. Without this,
244                    // a permissive ACL (dst `*` / `0.0.0.0/0`) would ACCEPT inbound multicast /
245                    // link-local that Go drops pre-rules — the one ACCEPT-where-Go-DROPS gap. Bounded
246                    // already by the source-attribution filter above (only attributable tailnet peers
247                    // reach here), but matched to Go for correctness.
248                    if drop_before_rules(dst) {
249                        tracing::trace!(?dst, "dropping multicast/link-local dst (pre-rule)");
250                        return false;
251                    }
252
253                    let (_src_port, dst_port) = match pkt.transport {
254                        Some(etherparse::TransportSlice::Udp(udp)) => {
255                            (udp.source_port(), udp.destination_port())
256                        }
257                        Some(etherparse::TransportSlice::Tcp(tcp)) => {
258                            (tcp.source_port(), tcp.destination_port())
259                        }
260                        _ => (0, 0),
261                    };
262
263                    let info = ts_packetfilter::PacketInfo {
264                        ip_proto: proto,
265                        port: dst_port,
266                        src,
267                        dst,
268                    };
269
270                    // TODO(npry): wire in nodecaps
271                    let caps = [];
272                    let verdict = self.packet_filter.can_access(&info, caps);
273
274                    tracing::trace!(?info, ?caps, verdict);
275
276                    verdict
277                });
278
279                v
280            });
281
282        let to_peers = to_peers
283            .into_iter()
284            .map(|(k, v)| (ts_transport::PeerId(k.0), v));
285
286        let to_local = self.or_in.route(to_local.flatten());
287        let to_peers = self.ur_out.route(to_peers);
288
289        if let Some(next) = self.wireguard.next_event()
290            && let Some(prev) = self
291                .wg_next
292                .replace(self.events.add(next, Subsystem::Wireguard))
293        {
294            prev.cancel();
295        }
296
297        InboundResult { to_local, to_peers }
298    }
299
300    /// Return the next time at which [`DataPlane::process_events`] must be called.
301    ///
302    /// [`DataPlane::process_outbound`], [`DataPlane::process_inbound`] and
303    /// [`DataPlane::process_events`] may all update the next event time. Callers should prefer
304    /// calling `next_event` as needed to get a correct result, rather than store the returned
305    /// value.
306    pub fn next_event(&self) -> Option<Instant> {
307        self.events.next_dispatch()
308    }
309
310    /// Process all queued events that are due for processing.
311    ///
312    /// Must be called at least as often as dictated by [`DataPlane::next_event`] for the
313    /// data plane to function correctly. It is harmless to call it more frequently.
314    pub fn process_events(&mut self) -> EventResult {
315        let mut to_peers = HashMap::new();
316        let now = Instant::now();
317        for event in self.events.dispatch(now) {
318            match event {
319                Subsystem::Wireguard => {
320                    let res = self.wireguard.dispatch_events(now);
321                    to_peers.extend(
322                        res.to_peers
323                            .into_iter()
324                            .map(|(id, pkts)| (ts_transport::PeerId(id.0), pkts)),
325                    );
326                }
327            }
328        }
329        let to_peers = self.ur_out.route(to_peers);
330
331        if let Some(next) = self.wireguard.next_event()
332            && let Some(prev) = self
333                .wg_next
334                .replace(self.events.add(next, Subsystem::Wireguard))
335        {
336            prev.cancel();
337        }
338
339        EventResult { to_peers }
340    }
341}
342
343/// The result of processing outbound packets.
344pub struct OutboundResult {
345    /// Packets to be sent into underlay transports for transmission.
346    pub to_peers: HashMap<(UnderlayTransportId, PeerId), Vec<PacketMut>>,
347    /// Packets to be looped back and delivered to overlay transports.
348    pub loopback: HashMap<OverlayTransportId, Vec<PacketMut>>,
349}
350
351/// The result of processing inbound packets.
352pub struct InboundResult {
353    /// Decrypted packets to be delivered to overlay transports.
354    pub to_local: HashMap<OverlayTransportId, Vec<PacketMut>>,
355    /// Encrypted packets to be sent to wireguard peers by the underlay.
356    pub to_peers: HashMap<(UnderlayTransportId, PeerId), Vec<PacketMut>>,
357}
358
359/// The result of processing an event.
360#[derive(Default)]
361pub struct EventResult {
362    /// Encrypted packets to be sent to wireguard peers by the underlay.
363    pub to_peers: HashMap<(UnderlayTransportId, PeerId), Vec<PacketMut>>,
364}
365
366#[cfg(test)]
367mod tests {
368    use std::sync::Mutex;
369
370    use super::*;
371
372    /// Records `(path, bytes)` for each capture-hook invocation in a test.
373    type CaptureLog = Arc<Mutex<Vec<(CapturePath, Vec<u8>)>>>;
374
375    #[test]
376    fn capture_path_codes() {
377        assert_eq!(CapturePath::FromLocal.code(), 0);
378        assert_eq!(CapturePath::FromPeer.code(), 1);
379        assert_eq!(CapturePath::SynthesizedToLocal.code(), 2);
380        assert_eq!(CapturePath::SynthesizedToPeer.code(), 3);
381    }
382
383    /// The pre-rule destination screen (Go filter `pre()`): multicast and non-allowlisted link-local
384    /// destinations are dropped before the ACL; ordinary unicast and the cloud-metadata link-local
385    /// exception pass through to the rules.
386    #[test]
387    fn pre_rule_drop_matches_go() {
388        let ip = |s: &str| s.parse::<std::net::IpAddr>().unwrap();
389        // Dropped pre-rules:
390        assert!(drop_before_rules(ip("224.0.0.1")), "IPv4 multicast dropped");
391        assert!(
392            drop_before_rules(ip("239.255.255.250")),
393            "IPv4 multicast (SSDP) dropped"
394        );
395        assert!(
396            drop_before_rules(ip("169.254.1.1")),
397            "IPv4 link-local dropped"
398        );
399        assert!(drop_before_rules(ip("ff02::1")), "IPv6 multicast dropped");
400        assert!(drop_before_rules(ip("fe80::1")), "IPv6 link-local dropped");
401        assert!(
402            drop_before_rules(ip("febf:ffff::1")),
403            "top of fe80::/10 dropped (locks the 0xffc0/0xfe80 mask)"
404        );
405        // Passed through to the rules:
406        assert!(
407            !drop_before_rules(ip("fec0::1")),
408            "just past fe80::/10 passes (locks the 0xffc0/0xfe80 mask)"
409        );
410        // IPv4-mapped-IPv6 destinations match NEITHER arm and fall through to the ACL, exactly as
411        // Go's `netip.Addr` predicates do (no unmap/canonicalize). Pinning this guards against a
412        // future "canonicalize to be safe" refactor silently diverging from Go.
413        assert!(
414            !drop_before_rules(ip("::ffff:224.0.0.1")),
415            "4in6-mapped multicast falls through to the ACL, matching Go"
416        );
417        assert!(
418            !drop_before_rules(ip("::ffff:169.254.1.1")),
419            "4in6-mapped link-local falls through to the ACL, matching Go"
420        );
421        assert!(
422            !drop_before_rules(ip("100.64.0.5")),
423            "ordinary tailnet unicast passes"
424        );
425        assert!(
426            !drop_before_rules(ip("8.8.8.8")),
427            "ordinary public unicast passes"
428        );
429        assert!(
430            !drop_before_rules(ip("169.254.169.254")),
431            "the cloud-metadata link-local address is the Go-allowlisted exception"
432        );
433        assert!(
434            !drop_before_rules(ip("fd7a:115c:a1e0::1")),
435            "IPv6 ULA (tailnet) passes"
436        );
437    }
438
439    /// Behavioral guard: an installed capture hook MUST be invoked with `CapturePath::FromLocal`
440    /// and the exact packet bytes for every outbound packet. The tee sits at the top of
441    /// `process_outbound`, before `or_out.route` consumes the packets, so it fires regardless of
442    /// whether a wireguard peer exists (an empty router just drops the routed packets afterward).
443    /// This is the only end-to-end guard that the dataplane capture tee actually fires; a refactor
444    /// that drops the tee would leave every byte-layout test green.
445    #[test]
446    fn capture_hook_fires_on_outbound() {
447        let mut dp = DataPlane::new(NodeKeyPair::new());
448
449        let recorded: CaptureLog = Arc::new(Mutex::new(Vec::new()));
450        let sink = recorded.clone();
451        dp.capture = Some(Arc::new(move |path: CapturePath, bytes: &[u8]| {
452            sink.lock().unwrap().push((path, bytes.to_vec()));
453        }));
454
455        // The outbound tee passes `p.as_ref()` as-given; the bytes need not be a valid IP packet.
456        let payload: Vec<u8> = vec![0xde, 0xad, 0xbe, 0xef];
457        let packet = PacketMut::from(payload.clone());
458
459        drop(dp.process_outbound(vec![packet]));
460
461        let captured = recorded.lock().unwrap();
462        assert_eq!(captured.len(), 1, "hook must fire exactly once per packet");
463        assert_eq!(captured[0].0, CapturePath::FromLocal);
464        assert_eq!(captured[0].1, payload);
465    }
466}