1use std::{net::SocketAddr, sync::Arc, time::Duration};
17
18use anyhow::anyhow;
19use axum::{BoxError, Router, error_handling::HandleErrorLayer};
20use endhost_api::routes::nest_endhost_api;
21use endhost_api_models::{
22 PathDiscovery,
23 underlays::{ScionRouter, Underlays},
24};
25use http::StatusCode;
26use jsonwebtoken::DecodingKey;
27use scion_proto::address::IsdAsn;
28use scion_sdk_observability::info_trace_layer;
29use snap_dataplane::session::manager::SessionTokenError;
30use snap_tokens::snap_token::SnapTokenClaims;
31use tokio::net::TcpListener;
32use tokio_util::sync::CancellationToken;
33use tower::{ServiceBuilder, timeout::TimeoutLayer};
34use url::Url;
35
36use crate::{
37 crpc_api::api_service::{
38 model::{SessionGrant, SessionManager},
39 nest_snap_control_api,
40 },
41 model::{CreateSessionError, DataPlaneDiscovery, SessionGranter, SnapDataPlane},
42 server::{
43 auth::AuthMiddlewareLayer,
44 metrics::{Metrics, PrometheusMiddlewareLayer},
45 },
46};
47
48pub mod auth;
49pub mod metrics;
50pub mod mock_segment_lister;
51pub mod state;
52
53const CONTROL_PLANE_API_TIMEOUT: Duration = Duration::from_secs(30);
54
55const CONTROL_PLANE_RATE_LIMIT: u64 = 20;
57const CONTROL_PLANE_RATE_LIMIT_PERIOD: Duration = Duration::from_secs(1);
58
59pub async fn start<DP, SM, SL>(
61 cancellation_token: CancellationToken,
62 listener: TcpListener,
63 dp_discovery: DP,
64 session_manager: SM,
65 segment_lister: SL,
66 snap_token_decoding_key: DecodingKey,
67 metrics: Metrics,
68) -> std::io::Result<()>
69where
70 DP: DataPlaneDiscovery + 'static + Send + Sync,
71 SM: SessionGranter + 'static + Send + Sync,
72 SL: PathDiscovery + 'static + Send + Sync,
73{
74 let router = Router::new();
75
76 let dp_discovery = Arc::new(dp_discovery);
77 let session_manager = Arc::new(session_manager);
78 let segment_lister = Arc::new(segment_lister);
79
80 let snap_cp_addr = listener
81 .local_addr()
82 .map_err(|e| std::io::Error::other(format!("Failed to get own local address: {e}")))?;
83
84 let snap_cp_api = match snap_cp_addr {
85 SocketAddr::V4(addr) => {
86 Url::parse(&format!("http://{addr}"))
87 .expect("It is safe to format a SocketAddr as a URL")
88 }
89 SocketAddr::V6(addr) => {
90 Url::parse(&format!("http://[{}]:{}", addr.ip(), addr.port()))
91 .expect("It is safe to format a SocketAddr as a URL")
92 }
93 };
94
95 let router = nest_endhost_api(
96 router,
97 Arc::new(UnderlayDiscoveryAdapter::new(
98 dp_discovery.clone(),
99 snap_cp_api,
100 )),
101 segment_lister.clone(),
102 );
103
104 let router = nest_snap_control_api(
105 router,
106 Arc::new(SessionManagerAdapter::new(
107 session_manager.clone(),
108 dp_discovery.clone(),
109 )),
110 );
111
112 let router = router.layer(
113 ServiceBuilder::new()
114 .layer(HandleErrorLayer::new(|err: BoxError| {
115 async move {
116 tracing::error!(error=%err, "Control plane API error");
117
118 (
119 StatusCode::INTERNAL_SERVER_ERROR,
120 format!("Unhandled error: {err}"),
121 )
122 }
123 }))
124 .layer(info_trace_layer())
125 .layer(TimeoutLayer::new(CONTROL_PLANE_API_TIMEOUT))
126 .layer(tower::buffer::BufferLayer::new(1024))
127 .layer(tower::limit::RateLimitLayer::new(
128 CONTROL_PLANE_RATE_LIMIT,
129 CONTROL_PLANE_RATE_LIMIT_PERIOD,
130 ))
131 .layer(PrometheusMiddlewareLayer::new(metrics))
132 .layer(AuthMiddlewareLayer::new(snap_token_decoding_key)),
133 );
134
135 tracing::info!(addr=%snap_cp_addr, "Starting control plane API");
136
137 if let Err(e) = axum::serve(listener, router.into_make_service())
138 .with_graceful_shutdown(cancellation_token.cancelled_owned())
139 .await
140 {
141 tracing::error!(error=%e, "Control plane API server unexpectedly stopped");
142 }
143
144 tracing::info!("Shutting down control plane API server");
145
146 Ok(())
147}
148
149struct UnderlayDiscoveryAdapter<T: DataPlaneDiscovery> {
151 dp_discovery: Arc<T>,
152 snap_cp_api: Url,
153}
154
155impl<T: DataPlaneDiscovery> UnderlayDiscoveryAdapter<T> {
156 fn new(dp_discovery: Arc<T>, snap_cp_api: Url) -> Self {
157 Self {
158 dp_discovery,
159 snap_cp_api,
160 }
161 }
162}
163
164impl<T: DataPlaneDiscovery> endhost_api_models::UnderlayDiscovery for UnderlayDiscoveryAdapter<T> {
165 fn list_underlays(&self, isd_as: IsdAsn) -> Underlays {
166 let dps = self.dp_discovery.list_udp_data_planes();
167 let mut udp_underlay = Vec::new();
168 for dp in dps {
169 for router_as in dp.isd_ases {
170 if isd_as != IsdAsn::WILDCARD && router_as.isd_as != isd_as {
171 continue;
172 };
173
174 udp_underlay.push(ScionRouter {
175 isd_as: router_as.isd_as,
176 internal_interface: dp.endpoint,
177 interfaces: router_as.interfaces.iter().map(|&i| i as u16).collect(),
178 });
179 }
180 }
181
182 let sus = self.dp_discovery.list_snap_data_planes();
183 if sus.is_empty() {
184 return Underlays {
185 udp_underlay,
186 snap_underlay: Vec::new(),
187 };
188 }
189
190 let mut snap_underlay = Vec::new();
191 let all_ases: Vec<IsdAsn> = sus.iter().flat_map(|su| su.isd_ases.clone()).collect();
192 if isd_as == IsdAsn::WILDCARD || all_ases.contains(&isd_as) {
193 snap_underlay.push(endhost_api_models::underlays::Snap {
194 address: self.snap_cp_api.clone(),
195 isd_ases: all_ases,
196 });
197 }
198
199 Underlays {
200 udp_underlay,
201 snap_underlay,
202 }
203 }
204}
205
206struct SessionManagerAdapter {
208 session_manager: Arc<dyn SessionGranter>,
209 dp_discovery: Arc<dyn DataPlaneDiscovery>,
210}
211
212impl SessionManagerAdapter {
213 fn new(
214 session_manager: Arc<dyn SessionGranter>,
215 dp_discovery: Arc<dyn DataPlaneDiscovery>,
216 ) -> Self {
217 Self {
218 session_manager,
219 dp_discovery,
220 }
221 }
222}
223
224impl SessionManager for SessionManagerAdapter {
225 fn create_session(
226 &self,
227 snap_token: SnapTokenClaims,
228 ) -> Result<Vec<SessionGrant>, (StatusCode, anyhow::Error)> {
229 let dps = self.dp_discovery.list_snap_data_planes();
230
231 let mut grants: Vec<SessionGrant> = Vec::with_capacity(dps.len());
232 for dp in dps.iter() {
233 let res = self
234 .session_manager
235 .create_session(dp.address, snap_token.clone());
236
237 match res {
240 Ok(grant) => grants.push(grant),
241 Err(err) => return Err(handle_session_error(dp, err)),
242 }
243 }
244
245 Ok(grants)
246 }
247
248 fn renew_session(
249 &self,
250 address: SocketAddr,
251 snap_token: SnapTokenClaims,
252 ) -> Result<SessionGrant, (StatusCode, anyhow::Error)> {
253 let dps = self.dp_discovery.list_snap_data_planes();
254 let Some(dp) = dps.iter().find(|dp| dp.address == address) else {
255 return Err((
256 StatusCode::NOT_FOUND,
257 anyhow!("No data plane with address {address}."),
258 ));
259 };
260
261 let res = self.session_manager.create_session(dp.address, snap_token);
262
263 match res {
264 Ok(grant) => Ok(grant),
265 Err(err) => Err(handle_session_error(dp, err)),
266 }
267 }
268}
269
270fn handle_session_error(
271 dp: &SnapDataPlane,
272 error: CreateSessionError,
273) -> (StatusCode, anyhow::Error) {
274 match error {
275 CreateSessionError::DataPlaneNotFound => {
276 (
277 StatusCode::NOT_FOUND,
278 anyhow!("no data plane with address {}", dp.address),
279 )
280 }
281 CreateSessionError::IssueSessionToken(SessionTokenError::EncodingError(err)) => {
282 tracing::error!(%err, "Failed to encode session token");
283 (StatusCode::INTERNAL_SERVER_ERROR, anyhow!("internal error"))
284 }
285 CreateSessionError::OpenSession(session_open_error) => {
286 tracing::error!(err=%session_open_error, "Failed to open session");
287 (StatusCode::INTERNAL_SERVER_ERROR, anyhow!("internal error"))
288 }
289 }
290}