1use axum::{
2 body::Body,
3 extract::{Extension, Path},
4 http::{header::HeaderName, HeaderValue, Response, StatusCode},
5 response::{sse::Event as AxumSseEvent, Sse},
6 routing::get,
7 BoxError, Json, Router,
8};
9use dis_spawner::{
10 event_stream::{event_stream, past_events},
11 SessionLivedBackend, SessionLivedBackendBuilder, SPAWNER_GROUP, SessionLivedBackendState,
12};
13use futures::{Stream, TryStreamExt};
14use k8s_openapi::api::core::v1::Event as KubeEventResource;
15use kube::{
16 api::PostParams, runtime::watcher::Error as KubeWatcherError, Api, Client, ResourceExt,
17};
18use serde::{Deserialize, Serialize};
19use serde_json::{json, Value};
20use std::{collections::HashMap, sync::Arc};
21use tokio_stream::StreamExt;
22
23pub async fn get_client() -> Result<Client, StatusCode> {
24 Client::try_default().await.map_err(|error| {
25 tracing::error!(%error, "Error getting client");
26 StatusCode::INTERNAL_SERVER_ERROR
27 })
28}
29
30pub fn backend_routes() -> Router {
31 Router::new()
32 .route("/:backend_id/status/stream", get(status_handler))
33 .route("/:backend_id/status", get(last_status_handler))
34 .route("/:backend_id/ready", get(ready_handler))
35}
36
37async fn ready_handler(
38 Path((backend_id,)): Path<(String,)>,
39 Extension(settings): Extension<Arc<ApiSettings>>,
40) -> Result<Response<Body>, StatusCode> {
41 let client = get_client().await?;
42 let name = settings.backend_to_slab_name(&backend_id);
43
44 let api = Api::<SessionLivedBackend>::namespaced(client, &settings.namespace);
45 let slab = api.get(&name).await;
46
47 match slab {
48 Ok(slab) => {
49 if slab.state() == SessionLivedBackendState::Ready {
50 let url = settings
51 .backend_to_url(&backend_id)
52 .ok_or(StatusCode::INTERNAL_SERVER_ERROR)?;
53
54 return Response::builder()
55 .status(StatusCode::FOUND)
56 .header(
57 HeaderName::from_static("location"),
58 HeaderValue::from_str(&url)
59 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?,
60 )
61 .body(Body::empty())
62 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR);
63 } else {
64 return Err(StatusCode::CONFLICT);
65 }
66 }
67 Err(error) => {
68 tracing::warn!(?error, "Error when looking up SessionLivedBackend.");
69 return Err(StatusCode::NOT_FOUND);
70 }
71 }
72}
73
74fn event_to_json(event: &KubeEventResource) -> Value {
75 json!({
76 "state": event.action,
77 "time": event.event_time,
78 })
79}
80
81fn convert_stream<T>(stream: T) -> impl Stream<Item = Result<AxumSseEvent, BoxError>>
82where
83 T: Stream<Item = Result<KubeEventResource, KubeWatcherError>>,
84{
85 stream.map(|event| {
86 let event: KubeEventResource = event.map_err(Box::new)?;
87
88 Ok(AxumSseEvent::default()
89 .json_data(event_to_json(&event))
90 .map_err(Box::new)?)
91 })
92}
93
94async fn last_status_handler(
95 Path((backend_id,)): Path<(String,)>,
96 Extension(settings): Extension<Arc<ApiSettings>>,
97) -> Result<Json<Value>, StatusCode> {
98 let client = settings.get_client().await?;
99
100 let resource_name = settings.backend_to_slab_name(&backend_id);
101 let mut events = past_events(client, &resource_name, &settings.namespace)
102 .await
103 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
104
105 events.sort_by_key(|d| d.event_time.clone());
106 let last_event = events.last().ok_or(StatusCode::NO_CONTENT)?;
107
108 Ok(Json(event_to_json(last_event)))
109}
110
111async fn status_handler(
112 Path((backend_id,)): Path<(String,)>,
113 Extension(settings): Extension<Arc<ApiSettings>>,
114) -> Result<Sse<impl Stream<Item = Result<AxumSseEvent, BoxError>>>, StatusCode> {
115 let client = settings.get_client().await?;
116
117 let name = format!("{}{}", settings.service_prefix, backend_id);
118 let events = event_stream(client, &name, &settings.namespace)
119 .await
120 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
121 let sse_events: _ = convert_stream(events).into_stream();
122
123 Ok(Sse::new(sse_events))
124}
125
126pub struct ApiSettings {
127 pub namespace: String,
128 pub url_template: Option<String>,
129 pub api_server_base: Option<String>,
130 pub service_prefix: String,
131}
132
133impl ApiSettings {
134 async fn get_client(&self) -> Result<Client, StatusCode> {
135 Client::try_default().await.map_err(|error| {
136 tracing::error!(%error, "Error getting client");
137 StatusCode::INTERNAL_SERVER_ERROR
138 })
139 }
140
141 pub fn backend_to_slab_name(&self, backend_id: &str) -> String {
142 format!("{}{}", self.service_prefix, backend_id)
143 }
144
145 pub fn backend_to_url(&self, backend_id: &str) -> Option<String> {
146 self.url_template
147 .as_ref()
148 .map(|d| d.replace("{}", &backend_id))
149 }
150
151 pub fn backend_api_path(&self, backend_id: &str, path: &str) -> Option<String> {
152 let api_server_base = self.api_server_base.as_ref()?;
153
154 Some(format!(
155 "{}/backend/{}/{}",
156 api_server_base, backend_id, path
157 ))
158 }
159
160 pub fn get_init_result(&self, backend_id: &str) -> SpawnResult {
161 let ready_url = self.backend_api_path(&backend_id, "ready");
162 let status_url = self.backend_api_path(&backend_id, "status");
163
164 SpawnResult {
165 url: self.backend_to_url(backend_id),
166 name: backend_id.to_string(),
167 ready_url,
168 status_url,
169 }
170 }
171
172 pub fn slab_name_to_backend(&self, slab_name: &str) -> Option<String> {
173 slab_name
174 .strip_prefix(&self.service_prefix)
175 .map(|t| t.to_string())
176 }
177}
178
179#[derive(Serialize)]
180#[serde(rename_all="camelCase")]
181pub struct SpawnResult {
182 pub url: Option<String>,
183 pub name: String,
184 pub ready_url: Option<String>,
185 pub status_url: Option<String>,
186}
187
188#[derive(Deserialize)]
189#[serde(rename_all = "camelCase")]
190pub struct SpawnPayload {
191 image: String,
193
194 port: Option<u16>,
196
197 #[serde(default)]
199 env: HashMap<String, String>,
200
201 grace_period_seconds: Option<u32>,
203}
204
205pub async fn spawn_handler(
206 Json(payload): Json<SpawnPayload>,
207 Extension(settings): Extension<Arc<ApiSettings>>,
208) -> Result<Json<SpawnResult>, StatusCode> {
209 let slab = SessionLivedBackendBuilder::new(&payload.image)
210 .with_env(payload.env)
211 .with_port(payload.port)
212 .with_grace_period(payload.grace_period_seconds)
213 .build_prefixed(&settings.service_prefix);
214
215 let client = Client::try_default().await.map_err(|error| {
216 tracing::error!(%error, "Error getting client");
217 StatusCode::INTERNAL_SERVER_ERROR
218 })?;
219 let api = Api::<SessionLivedBackend>::namespaced(client, &settings.namespace);
220
221 let result = api
222 .create(
223 &PostParams {
224 field_manager: Some(SPAWNER_GROUP.to_string()),
225 ..PostParams::default()
226 },
227 &slab,
228 )
229 .await
230 .map_err(|error| {
231 tracing::error!(%error, "Error creating SessionLivedBackend.");
232 StatusCode::INTERNAL_SERVER_ERROR
233 })?;
234
235 let prefixed_name = result.name();
236 let name = settings
237 .slab_name_to_backend(&prefixed_name)
238 .ok_or_else(|| {
239 tracing::warn!("Couldn't strip prefix from name.");
240 StatusCode::EXPECTATION_FAILED
241 })?;
242
243 let url = settings.backend_to_url(&name);
244
245 tracing::info!(?url, %name, "Created SessionLivedBackend.");
246
247 Ok(Json(settings.get_init_result(&name)))
248}