dis_spawner_api/
lib.rs

1use axum::{
2    body::Body,
3    extract::{Extension, Path},
4    http::{header::HeaderName, HeaderValue, Response, StatusCode},
5    response::{sse::Event as AxumSseEvent, Sse},
6    routing::get,
7    BoxError, Json, Router,
8};
9use dis_spawner::{
10    event_stream::{event_stream, past_events},
11    SessionLivedBackend, SessionLivedBackendBuilder, SPAWNER_GROUP, SessionLivedBackendState,
12};
13use futures::{Stream, TryStreamExt};
14use k8s_openapi::api::core::v1::Event as KubeEventResource;
15use kube::{
16    api::PostParams, runtime::watcher::Error as KubeWatcherError, Api, Client, ResourceExt,
17};
18use serde::{Deserialize, Serialize};
19use serde_json::{json, Value};
20use std::{collections::HashMap, sync::Arc};
21use tokio_stream::StreamExt;
22
23pub async fn get_client() -> Result<Client, StatusCode> {
24    Client::try_default().await.map_err(|error| {
25        tracing::error!(%error, "Error getting client");
26        StatusCode::INTERNAL_SERVER_ERROR
27    })
28}
29
30pub fn backend_routes() -> Router {
31    Router::new()
32        .route("/:backend_id/status/stream", get(status_handler))
33        .route("/:backend_id/status", get(last_status_handler))
34        .route("/:backend_id/ready", get(ready_handler))
35}
36
37async fn ready_handler(
38    Path((backend_id,)): Path<(String,)>,
39    Extension(settings): Extension<Arc<ApiSettings>>,
40) -> Result<Response<Body>, StatusCode> {
41    let client = get_client().await?;
42    let name = settings.backend_to_slab_name(&backend_id);
43
44    let api = Api::<SessionLivedBackend>::namespaced(client, &settings.namespace);
45    let slab = api.get(&name).await;
46
47    match slab {
48        Ok(slab) => {
49            if slab.state() == SessionLivedBackendState::Ready {
50                let url = settings
51                    .backend_to_url(&backend_id)
52                    .ok_or(StatusCode::INTERNAL_SERVER_ERROR)?;
53
54                return Response::builder()
55                    .status(StatusCode::FOUND)
56                    .header(
57                        HeaderName::from_static("location"),
58                        HeaderValue::from_str(&url)
59                            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?,
60                    )
61                    .body(Body::empty())
62                    .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR);
63            } else {
64                return Err(StatusCode::CONFLICT);
65            }
66        }
67        Err(error) => {
68            tracing::warn!(?error, "Error when looking up SessionLivedBackend.");
69            return Err(StatusCode::NOT_FOUND);
70        }
71    }
72}
73
74fn event_to_json(event: &KubeEventResource) -> Value {
75    json!({
76        "state": event.action,
77        "time": event.event_time,
78    })
79}
80
81fn convert_stream<T>(stream: T) -> impl Stream<Item = Result<AxumSseEvent, BoxError>>
82where
83    T: Stream<Item = Result<KubeEventResource, KubeWatcherError>>,
84{
85    stream.map(|event| {
86        let event: KubeEventResource = event.map_err(Box::new)?;
87
88        Ok(AxumSseEvent::default()
89            .json_data(event_to_json(&event))
90            .map_err(Box::new)?)
91    })
92}
93
94async fn last_status_handler(
95    Path((backend_id,)): Path<(String,)>,
96    Extension(settings): Extension<Arc<ApiSettings>>,
97) -> Result<Json<Value>, StatusCode> {
98    let client = settings.get_client().await?;
99
100    let resource_name = settings.backend_to_slab_name(&backend_id);
101    let mut events = past_events(client, &resource_name, &settings.namespace)
102        .await
103        .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
104
105    events.sort_by_key(|d| d.event_time.clone());
106    let last_event = events.last().ok_or(StatusCode::NO_CONTENT)?;
107
108    Ok(Json(event_to_json(last_event)))
109}
110
111async fn status_handler(
112    Path((backend_id,)): Path<(String,)>,
113    Extension(settings): Extension<Arc<ApiSettings>>,
114) -> Result<Sse<impl Stream<Item = Result<AxumSseEvent, BoxError>>>, StatusCode> {
115    let client = settings.get_client().await?;
116
117    let name = format!("{}{}", settings.service_prefix, backend_id);
118    let events = event_stream(client, &name, &settings.namespace)
119        .await
120        .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
121    let sse_events: _ = convert_stream(events).into_stream();
122
123    Ok(Sse::new(sse_events))
124}
125
126pub struct ApiSettings {
127    pub namespace: String,
128    pub url_template: Option<String>,
129    pub api_server_base: Option<String>,
130    pub service_prefix: String,
131}
132
133impl ApiSettings {
134    async fn get_client(&self) -> Result<Client, StatusCode> {
135        Client::try_default().await.map_err(|error| {
136            tracing::error!(%error, "Error getting client");
137            StatusCode::INTERNAL_SERVER_ERROR
138        })
139    }
140
141    pub fn backend_to_slab_name(&self, backend_id: &str) -> String {
142        format!("{}{}", self.service_prefix, backend_id)
143    }
144
145    pub fn backend_to_url(&self, backend_id: &str) -> Option<String> {
146        self.url_template
147            .as_ref()
148            .map(|d| d.replace("{}", &backend_id))
149    }
150
151    pub fn backend_api_path(&self, backend_id: &str, path: &str) -> Option<String> {
152        let api_server_base = self.api_server_base.as_ref()?;
153
154        Some(format!(
155            "{}/backend/{}/{}",
156            api_server_base, backend_id, path
157        ))
158    }
159
160    pub fn get_init_result(&self, backend_id: &str) -> SpawnResult {
161        let ready_url = self.backend_api_path(&backend_id, "ready");
162        let status_url = self.backend_api_path(&backend_id, "status");
163
164        SpawnResult {
165            url: self.backend_to_url(backend_id),
166            name: backend_id.to_string(),
167            ready_url,
168            status_url,
169        }
170    }
171
172    pub fn slab_name_to_backend(&self, slab_name: &str) -> Option<String> {
173        slab_name
174            .strip_prefix(&self.service_prefix)
175            .map(|t| t.to_string())
176    }
177}
178
179#[derive(Serialize)]
180#[serde(rename_all="camelCase")]
181pub struct SpawnResult {
182    pub url: Option<String>,
183    pub name: String,
184    pub ready_url: Option<String>,
185    pub status_url: Option<String>,
186}
187
188#[derive(Deserialize)]
189#[serde(rename_all = "camelCase")]
190pub struct SpawnPayload {
191    /// The container image used to create the container.
192    image: String,
193
194    /// HTTP port to expose on the container.
195    port: Option<u16>,
196
197    /// Environment variables to expose on the container.
198    #[serde(default)]
199    env: HashMap<String, String>,
200
201    /// Duration of time (in seconds) before the pod is shut down.
202    grace_period_seconds: Option<u32>,
203}
204
205pub async fn spawn_handler(
206    Json(payload): Json<SpawnPayload>,
207    Extension(settings): Extension<Arc<ApiSettings>>,
208) -> Result<Json<SpawnResult>, StatusCode> {
209    let slab = SessionLivedBackendBuilder::new(&payload.image)
210        .with_env(payload.env)
211        .with_port(payload.port)
212        .with_grace_period(payload.grace_period_seconds)
213        .build_prefixed(&settings.service_prefix);
214
215    let client = Client::try_default().await.map_err(|error| {
216        tracing::error!(%error, "Error getting client");
217        StatusCode::INTERNAL_SERVER_ERROR
218    })?;
219    let api = Api::<SessionLivedBackend>::namespaced(client, &settings.namespace);
220
221    let result = api
222        .create(
223            &PostParams {
224                field_manager: Some(SPAWNER_GROUP.to_string()),
225                ..PostParams::default()
226            },
227            &slab,
228        )
229        .await
230        .map_err(|error| {
231            tracing::error!(%error, "Error creating SessionLivedBackend.");
232            StatusCode::INTERNAL_SERVER_ERROR
233        })?;
234
235    let prefixed_name = result.name();
236    let name = settings
237        .slab_name_to_backend(&prefixed_name)
238        .ok_or_else(|| {
239            tracing::warn!("Couldn't strip prefix from name.");
240            StatusCode::EXPECTATION_FAILED
241        })?;
242
243    let url = settings.backend_to_url(&name);
244
245    tracing::info!(?url, %name, "Created SessionLivedBackend.");
246
247    Ok(Json(settings.get_init_result(&name)))
248}