snap_control/
server.rs

1// Copyright 2025 Anapaya Systems
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//   http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//! SNAP control plane API server.
15
16use std::{net::SocketAddr, sync::Arc, time::Duration};
17
18use anyhow::anyhow;
19use axum::{BoxError, Router, error_handling::HandleErrorLayer};
20use endhost_api::routes::nest_endhost_api;
21use endhost_api_models::{
22    PathDiscovery,
23    underlays::{ScionRouter, Underlays},
24};
25use http::StatusCode;
26use jsonwebtoken::DecodingKey;
27use scion_proto::address::IsdAsn;
28use scion_sdk_observability::info_trace_layer;
29use snap_dataplane::session::manager::SessionTokenError;
30use snap_tokens::snap_token::SnapTokenClaims;
31use tokio::net::TcpListener;
32use tokio_util::sync::CancellationToken;
33use tower::{ServiceBuilder, timeout::TimeoutLayer};
34use url::Url;
35
36use crate::{
37    crpc_api::api_service::{
38        model::{SessionGrant, SessionManager},
39        nest_snap_control_api,
40    },
41    model::{CreateSessionError, DataPlaneDiscovery, SessionGranter, SnapDataPlane},
42    server::{
43        auth::AuthMiddlewareLayer,
44        metrics::{Metrics, PrometheusMiddlewareLayer},
45    },
46};
47
48pub mod auth;
49pub mod metrics;
50pub mod mock_segment_lister;
51pub mod state;
52
53const CONTROL_PLANE_API_TIMEOUT: Duration = Duration::from_secs(30);
54
55// The control plane API rate limit is set to 5 requests per second.
56const CONTROL_PLANE_RATE_LIMIT: u64 = 20;
57const CONTROL_PLANE_RATE_LIMIT_PERIOD: Duration = Duration::from_secs(1);
58
59/// Start the SNAP control plane API server.
60pub async fn start<DP, SM, SL>(
61    cancellation_token: CancellationToken,
62    listener: TcpListener,
63    dp_discovery: DP,
64    session_manager: SM,
65    segment_lister: SL,
66    snap_token_decoding_key: DecodingKey,
67    metrics: Metrics,
68) -> std::io::Result<()>
69where
70    DP: DataPlaneDiscovery + 'static + Send + Sync,
71    SM: SessionGranter + 'static + Send + Sync,
72    SL: PathDiscovery + 'static + Send + Sync,
73{
74    let router = Router::new();
75
76    let dp_discovery = Arc::new(dp_discovery);
77    let session_manager = Arc::new(session_manager);
78    let segment_lister = Arc::new(segment_lister);
79
80    let snap_cp_addr = listener
81        .local_addr()
82        .map_err(|e| std::io::Error::other(format!("Failed to get own local address: {e}")))?;
83
84    let snap_cp_api = match snap_cp_addr {
85        SocketAddr::V4(addr) => {
86            Url::parse(&format!("http://{addr}"))
87                .expect("It is safe to format a SocketAddr as a URL")
88        }
89        SocketAddr::V6(addr) => {
90            Url::parse(&format!("http://[{}]:{}", addr.ip(), addr.port()))
91                .expect("It is safe to format a SocketAddr as a URL")
92        }
93    };
94
95    let router = nest_endhost_api(
96        router,
97        Arc::new(UnderlayDiscoveryAdapter::new(
98            dp_discovery.clone(),
99            snap_cp_api,
100        )),
101        segment_lister.clone(),
102    );
103
104    let router = nest_snap_control_api(
105        router,
106        Arc::new(SessionManagerAdapter::new(
107            session_manager.clone(),
108            dp_discovery.clone(),
109        )),
110    );
111
112    let router = router.layer(
113        ServiceBuilder::new()
114            .layer(HandleErrorLayer::new(|err: BoxError| {
115                async move {
116                    tracing::error!(error=%err, "Control plane API error");
117
118                    (
119                        StatusCode::INTERNAL_SERVER_ERROR,
120                        format!("Unhandled error: {err}"),
121                    )
122                }
123            }))
124            .layer(info_trace_layer())
125            .layer(TimeoutLayer::new(CONTROL_PLANE_API_TIMEOUT))
126            .layer(tower::buffer::BufferLayer::new(1024))
127            .layer(tower::limit::RateLimitLayer::new(
128                CONTROL_PLANE_RATE_LIMIT,
129                CONTROL_PLANE_RATE_LIMIT_PERIOD,
130            ))
131            .layer(PrometheusMiddlewareLayer::new(metrics))
132            .layer(AuthMiddlewareLayer::new(snap_token_decoding_key)),
133    );
134
135    tracing::info!(addr=%snap_cp_addr, "Starting control plane API");
136
137    if let Err(e) = axum::serve(listener, router.into_make_service())
138        .with_graceful_shutdown(cancellation_token.cancelled_owned())
139        .await
140    {
141        tracing::error!(error=%e, "Control plane API server unexpectedly stopped");
142    }
143
144    tracing::info!("Shutting down control plane API server");
145
146    Ok(())
147}
148
149/// Adapter implementing UnderlayDiscovery for any DataPlaneDiscovery.
150struct UnderlayDiscoveryAdapter<T: DataPlaneDiscovery> {
151    dp_discovery: Arc<T>,
152    snap_cp_api: Url,
153}
154
155impl<T: DataPlaneDiscovery> UnderlayDiscoveryAdapter<T> {
156    fn new(dp_discovery: Arc<T>, snap_cp_api: Url) -> Self {
157        Self {
158            dp_discovery,
159            snap_cp_api,
160        }
161    }
162}
163
164impl<T: DataPlaneDiscovery> endhost_api_models::UnderlayDiscovery for UnderlayDiscoveryAdapter<T> {
165    fn list_underlays(&self, isd_as: IsdAsn) -> Underlays {
166        let dps = self.dp_discovery.list_udp_data_planes();
167        let mut udp_underlay = Vec::new();
168        for dp in dps {
169            for router_as in dp.isd_ases {
170                if isd_as != IsdAsn::WILDCARD && router_as.isd_as != isd_as {
171                    continue;
172                };
173
174                udp_underlay.push(ScionRouter {
175                    isd_as: router_as.isd_as,
176                    internal_interface: dp.endpoint,
177                    interfaces: router_as.interfaces.iter().map(|&i| i as u16).collect(),
178                });
179            }
180        }
181
182        let sus = self.dp_discovery.list_snap_data_planes();
183        if sus.is_empty() {
184            return Underlays {
185                udp_underlay,
186                snap_underlay: Vec::new(),
187            };
188        }
189
190        let mut snap_underlay = Vec::new();
191        let all_ases: Vec<IsdAsn> = sus.iter().flat_map(|su| su.isd_ases.clone()).collect();
192        if isd_as == IsdAsn::WILDCARD || all_ases.contains(&isd_as) {
193            snap_underlay.push(endhost_api_models::underlays::Snap {
194                address: self.snap_cp_api.clone(),
195                isd_ases: all_ases,
196            });
197        }
198
199        Underlays {
200            udp_underlay,
201            snap_underlay,
202        }
203    }
204}
205
206/// Adapter implementing SessionManager for any SessionManagerDeprecated.
207struct SessionManagerAdapter {
208    session_manager: Arc<dyn SessionGranter>,
209    dp_discovery: Arc<dyn DataPlaneDiscovery>,
210}
211
212impl SessionManagerAdapter {
213    fn new(
214        session_manager: Arc<dyn SessionGranter>,
215        dp_discovery: Arc<dyn DataPlaneDiscovery>,
216    ) -> Self {
217        Self {
218            session_manager,
219            dp_discovery,
220        }
221    }
222}
223
224impl SessionManager for SessionManagerAdapter {
225    fn create_session(
226        &self,
227        snap_token: SnapTokenClaims,
228    ) -> Result<Vec<SessionGrant>, (StatusCode, anyhow::Error)> {
229        let dps = self.dp_discovery.list_snap_data_planes();
230
231        let mut grants: Vec<SessionGrant> = Vec::with_capacity(dps.len());
232        for dp in dps.iter() {
233            let res = self
234                .session_manager
235                .create_session(dp.address, snap_token.clone());
236
237            // XXX(bunert): We currently fail the entire request if creating a session for any data
238            // plane fails. Eventually we want to return partial results here.
239            match res {
240                Ok(grant) => grants.push(grant),
241                Err(err) => return Err(handle_session_error(dp, err)),
242            }
243        }
244
245        Ok(grants)
246    }
247
248    fn renew_session(
249        &self,
250        address: SocketAddr,
251        snap_token: SnapTokenClaims,
252    ) -> Result<SessionGrant, (StatusCode, anyhow::Error)> {
253        let dps = self.dp_discovery.list_snap_data_planes();
254        let Some(dp) = dps.iter().find(|dp| dp.address == address) else {
255            return Err((
256                StatusCode::NOT_FOUND,
257                anyhow!("No data plane with address {address}."),
258            ));
259        };
260
261        let res = self.session_manager.create_session(dp.address, snap_token);
262
263        match res {
264            Ok(grant) => Ok(grant),
265            Err(err) => Err(handle_session_error(dp, err)),
266        }
267    }
268}
269
270fn handle_session_error(
271    dp: &SnapDataPlane,
272    error: CreateSessionError,
273) -> (StatusCode, anyhow::Error) {
274    match error {
275        CreateSessionError::DataPlaneNotFound => {
276            (
277                StatusCode::NOT_FOUND,
278                anyhow!("no data plane with address {}", dp.address),
279            )
280        }
281        CreateSessionError::IssueSessionToken(SessionTokenError::EncodingError(err)) => {
282            tracing::error!(%err, "Failed to encode session token");
283            (StatusCode::INTERNAL_SERVER_ERROR, anyhow!("internal error"))
284        }
285    }
286}