1use std::{collections::HashMap, convert::Infallible, ops::DerefMut, sync::atomic::AtomicU32};
4
5use tokio::sync::{Mutex, mpsc};
6use ts_packet::PacketMut;
7use ts_transport::{OverlayTransportId, PeerId, UnderlayTransportId};
8use ts_tunnel::NodeKeyPair;
9
10use crate::{EventResult, InboundResult, OutboundResult};
11
12pub type DataplaneToOverlay = mpsc::UnboundedSender<Vec<PacketMut>>;
14
15pub type DataplaneFromOverlay = mpsc::UnboundedReceiver<Vec<PacketMut>>;
17
18pub type DataplaneToUnderlay = mpsc::UnboundedSender<(PeerId, Vec<PacketMut>)>;
20
21pub type DataplaneFromUnderlay = mpsc::UnboundedReceiver<(PeerId, Vec<PacketMut>)>;
23
24pub struct DataPlane {
28 core_state: Mutex<CoreState>,
29 poll_state: Mutex<PollState>,
30
31 transports_changed: tokio::sync::Notify,
32
33 underlay_down: DataplaneToUnderlay,
34 overlay_up: DataplaneToOverlay,
35
36 next_underlay_transport: AtomicU32,
37 next_overlay_transport: AtomicU32,
38}
39
40struct CoreState {
41 sync: crate::DataPlane,
43
44 overlay_transports: HashMap<OverlayTransportId, DataplaneToOverlay>,
46 underlay_transports: HashMap<UnderlayTransportId, DataplaneToUnderlay>,
48}
49
50struct PollState {
52 from_overlay: DataplaneFromOverlay,
54 from_underlay: DataplaneFromUnderlay,
56}
57
58impl DataPlane {
59 pub fn new(my_key: NodeKeyPair) -> Self {
64 let (overlay_up, overlay_down) = mpsc::unbounded_channel();
65 let (underlay_down, underlay_up) = mpsc::unbounded_channel();
66
67 let sync = crate::DataPlane::new(my_key);
68
69 Self {
70 underlay_down,
71 overlay_up,
72
73 next_overlay_transport: Default::default(),
74 next_underlay_transport: Default::default(),
75
76 transports_changed: tokio::sync::Notify::new(),
77
78 core_state: Mutex::new(CoreState {
79 sync,
80 overlay_transports: Default::default(),
81 underlay_transports: Default::default(),
82 }),
83
84 poll_state: Mutex::new(PollState {
85 from_overlay: overlay_down,
86 from_underlay: underlay_up,
87 }),
88 }
89 }
90
91 pub async fn new_underlay_transport(
93 &self,
94 ) -> (
95 UnderlayTransportId,
96 DataplaneFromUnderlay,
97 DataplaneToUnderlay,
98 ) {
99 let id = self
100 .next_underlay_transport
101 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
102 .into();
103
104 let (tx, rx) = mpsc::unbounded_channel();
105
106 {
107 let mut rest = self.core_state.lock().await;
108 rest.underlay_transports.insert(id, tx);
109 }
110
111 self.transports_changed.notify_waiters();
112
113 (id, rx, self.underlay_down.clone())
114 }
115
116 pub async fn new_overlay_transport(
118 &self,
119 ) -> (OverlayTransportId, DataplaneToOverlay, DataplaneFromOverlay) {
120 let id = self
121 .next_overlay_transport
122 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
123 .into();
124
125 let (tx, rx) = mpsc::unbounded_channel();
126
127 {
128 let mut rest = self.core_state.lock().await;
129 rest.overlay_transports.insert(id, tx);
130 }
131
132 self.transports_changed.notify_waiters();
133
134 (id, self.overlay_up.clone(), rx)
135 }
136
137 pub async fn run(&self) -> Infallible {
139 loop {
140 self.step().await;
141 }
142 }
143
144 #[tracing::instrument(skip_all)]
146 pub async fn step(&self) {
147 enum SelectResult {
148 OverlayDown(Vec<PacketMut>),
149 UnderlayUp(PeerId, Vec<PacketMut>),
150 TransportsChanged,
151 Event,
152 }
153
154 let select_result = {
166 let next_event = {
167 let state = self.core_state.lock().await;
168 state.sync.next_event()
169 };
170
171 let mut poll_state = self.poll_state.lock().await;
172
173 let PollState {
174 from_overlay: overlay_down,
175 from_underlay: underlay_up,
176 ..
177 } = &mut *poll_state;
178
179 tokio::select! {
180 overlay_pkts = overlay_down.recv() => {
181 let overlay_pkts = overlay_pkts.unwrap();
182 tracing::trace!(n_overlay_pkts = overlay_pkts.len());
183
184 SelectResult::OverlayDown(overlay_pkts)
185 }
186
187 underlay_pkts = underlay_up.recv() => {
188 let (peer_id, underlay_pkts) = underlay_pkts.unwrap();
189 tracing::trace!(%peer_id, n_underlay_pkts = underlay_pkts.len());
190
191 SelectResult::UnderlayUp(peer_id, underlay_pkts)
192 }
193
194 _ = self.transports_changed.notified() => {
195 tracing::trace!("transports changed");
196
197 SelectResult::TransportsChanged
198 }
199
200 _ = option_sleep_until(next_event.map(Into::into)) => {
201 tracing::trace!("event");
202
203 SelectResult::Event
204 }
205 }
206 };
207
208 let mut core = self.core_state.lock().await;
209
210 let (to_peers, to_local) = match select_result {
211 SelectResult::OverlayDown(overlay_down) => {
212 let OutboundResult { to_peers, loopback } =
213 core.sync.process_outbound(overlay_down);
214
215 (Some(to_peers), Some(loopback))
216 }
217 SelectResult::UnderlayUp(_peer_id, underlay_up) => {
218 let InboundResult { to_local, to_peers } = core.sync.process_inbound(underlay_up);
219
220 (Some(to_peers), Some(to_local))
221 }
222 SelectResult::Event => {
223 let EventResult { to_peers } = core.sync.process_events();
224 (Some(to_peers), None)
225 }
226 SelectResult::TransportsChanged => (None, None),
227 };
228
229 if let Some(to_peers) = to_peers {
230 write_to_underlay(&core, to_peers).await;
231 }
232
233 if let Some(to_local) = to_local {
234 write_to_overlay(&core, to_local).await;
235 }
236 }
237
238 pub async fn inner(&self) -> impl DerefMut<Target = crate::DataPlane> {
244 let core = self.core_state.lock().await;
245 tokio::sync::MutexGuard::map(core, |x| &mut x.sync)
246 }
247}
248
249async fn write_to_overlay(slf: &CoreState, packets: HashMap<OverlayTransportId, Vec<PacketMut>>) {
250 for (id, packets) in packets {
251 if let Some(queue) = slf.overlay_transports.get(&id) {
252 tracing::trace!(overlay_id = ?id, n_packets = packets.len());
253 queue.send(packets).unwrap();
254 }
255 }
256}
257
258async fn write_to_underlay(
259 slf: &CoreState,
260 packets: impl IntoIterator<Item = ((UnderlayTransportId, PeerId), Vec<PacketMut>)>,
261) {
262 for ((tid, peer_id), packets) in packets {
263 tracing::trace!(underlay_id = ?tid, %peer_id, n_packets = packets.len());
264
265 if let Some(queue) = slf.underlay_transports.get(&tid) {
266 queue.send((peer_id, packets)).unwrap();
267 }
268 }
269}
270
271async fn option_sleep_until(deadline: Option<tokio::time::Instant>) {
272 match deadline {
273 Some(deadline) => tokio::time::sleep_until(deadline).await,
274 None => core::future::pending().await,
275 }
276}