1#![doc = include_str!("../README.md")]
2
3pub use builder::SessionLivedBackendBuilder;
4pub use image_pull_policy::ImagePullPolicy;
5use k8s_openapi::{
6 api::core::v1::{
7 Container, EnvVar, Event, LocalObjectReference, Pod, PodSpec, Service, ServicePort,
8 ServiceSpec, Volume,
9 },
10 apimachinery::pkg::apis::meta::v1::{MicroTime, OwnerReference},
11 chrono::Utc,
12};
13use kube::{
14 api::{Patch, PatchParams, PostParams},
15 Api, Client, CustomResource, Resource,
16};
17use kube::{core::ObjectMeta, ResourceExt};
18use schemars::JsonSchema;
19use serde::{Deserialize, Serialize};
20use serde_json::json;
21pub use state::SessionLivedBackendState;
22use std::{collections::BTreeMap, fmt::Debug};
23
24mod builder;
25pub mod event_stream;
26mod image_pull_policy;
27mod state;
28
29pub const SPAWNER_GROUP: &str = "spawner.dev";
31
32const DEFAULT_PORT: u16 = 8080;
34
35const SIDECAR_PORT: u16 = 9090;
37
38const LABEL_RUN: &str = "run";
40
41const SIDECAR: &str = "spawner-sidecar";
43
44const APPLICATION: &str = "spawner-app";
46
47#[derive(CustomResource, Clone, Debug, Deserialize, Serialize, JsonSchema, Default)]
53#[kube(
54 group = "spawner.dev",
55 version = "v1",
56 kind = "SessionLivedBackend",
57 shortname = "slab",
58 status = "SessionLivedBackendStatus",
59 namespaced
60)]
61#[serde(rename_all = "camelCase")]
62pub struct SessionLivedBackendSpec {
63 pub template: BackendPodSpec,
65
66 pub grace_period_seconds: Option<u32>,
71
72 pub http_port: Option<u16>,
74}
75
76#[derive(Serialize, Deserialize, Debug, PartialEq, Clone, JsonSchema, Default)]
77#[serde(rename_all = "camelCase")]
78pub struct BackendPodSpec {
79 containers: Vec<Container>,
82
83 image_pull_secrets: Option<Vec<LocalObjectReference>>,
86
87 volumes: Option<Vec<Volume>>,
89}
90
91impl Into<PodSpec> for BackendPodSpec {
92 fn into(self) -> PodSpec {
93 PodSpec {
94 containers: self.containers,
95 image_pull_secrets: self.image_pull_secrets,
96 volumes: self.volumes,
97 restart_policy: Some("Never".to_string()),
98 ..Default::default()
99 }
100 }
101}
102
103#[derive(Serialize, Deserialize, Debug, PartialEq, Clone, JsonSchema, Default)]
105#[serde(rename_all = "camelCase")]
106pub struct SessionLivedBackendStatus {
107 #[serde(skip_serializing_if = "Option::is_none")]
109 pub submitted: Option<bool>,
110
111 #[serde(skip_serializing_if = "Option::is_none")]
113 pub constructed: Option<bool>,
114
115 #[serde(skip_serializing_if = "Option::is_none")]
117 pub scheduled: Option<bool>,
118
119 #[serde(skip_serializing_if = "Option::is_none")]
121 pub running: Option<bool>,
122
123 #[serde(skip_serializing_if = "Option::is_none")]
126 pub ready: Option<bool>,
127
128 #[serde(skip_serializing_if = "Option::is_none")]
132 pub swept: Option<bool>,
133
134 #[serde(skip_serializing_if = "Option::is_none")]
136 pub failed: Option<bool>,
137
138 #[serde(skip_serializing_if = "Option::is_none")]
143 pub node_name: Option<String>,
144
145 #[serde(skip_serializing_if = "Option::is_none")]
147 pub ip: Option<String>,
148
149 #[serde(skip_serializing_if = "Option::is_none")]
151 pub port: Option<u16>,
152
153 #[serde(skip_serializing_if = "Option::is_none")]
155 pub url: Option<String>,
156}
157
158impl SessionLivedBackendStatus {
159 pub fn patch_state(
160 state: SessionLivedBackendState,
161 base_status: SessionLivedBackendStatus,
162 ) -> SessionLivedBackendStatus {
163 match state {
164 SessionLivedBackendState::Submitted => SessionLivedBackendStatus {
165 submitted: Some(true),
166 ..base_status
167 },
168 SessionLivedBackendState::Constructed => SessionLivedBackendStatus {
169 constructed: Some(true),
170 ..base_status
171 },
172 SessionLivedBackendState::Scheduled => SessionLivedBackendStatus {
173 scheduled: Some(true),
174 ..base_status
175 },
176 SessionLivedBackendState::Running => SessionLivedBackendStatus {
177 running: Some(true),
178 ..base_status
179 },
180 SessionLivedBackendState::Ready => SessionLivedBackendStatus {
181 ready: Some(true),
182 ..base_status
183 },
184 SessionLivedBackendState::Swept => SessionLivedBackendStatus {
185 swept: Some(true),
186 ..base_status
187 },
188 SessionLivedBackendState::Failed => SessionLivedBackendStatus {
189 failed: Some(true),
190 ..base_status
191 },
192 }
193 }
194}
195
196#[derive(thiserror::Error, Debug)]
197pub enum Error {
198 #[error("MissingObjectKey: {0}")]
199 MissingObjectKey(&'static str),
200
201 #[error("Failure from Kubernetes: {0}")]
202 KubernetesFailure(#[source] kube::Error),
203}
204
205impl SessionLivedBackend {
206 pub fn state(&self) -> SessionLivedBackendState {
207 if let Some(status) = &self.status {
208 if status.swept == Some(true) {
209 return SessionLivedBackendState::Swept;
210 } else if status.ready == Some(true) {
211 return SessionLivedBackendState::Ready;
212 } else if status.running == Some(true) {
213 return SessionLivedBackendState::Running;
214 } else if status.scheduled == Some(true) {
215 return SessionLivedBackendState::Scheduled;
216 } else if status.constructed == Some(true) {
217 return SessionLivedBackendState::Constructed;
218 }
219 }
220 SessionLivedBackendState::Submitted
221 }
222
223 fn metadata_reference(&self) -> Result<OwnerReference, Error> {
224 let meta = &self.metadata;
225
226 Ok(OwnerReference {
227 api_version: SessionLivedBackend::api_version(&()).to_string(),
228 kind: SessionLivedBackend::kind(&()).to_string(),
229 controller: Some(true),
230 name: meta
231 .name
232 .as_ref()
233 .ok_or(Error::MissingObjectKey("metadata.name"))?
234 .to_string(),
235 uid: meta
236 .uid
237 .as_ref()
238 .ok_or(Error::MissingObjectKey("metadata.uid"))?
239 .to_string(),
240 ..OwnerReference::default()
241 })
242 }
243
244 fn run_label(&self) -> BTreeMap<String, String> {
245 vec![(LABEL_RUN.to_string(), self.name())]
246 .into_iter()
247 .collect()
248 }
249
250 fn labels(&self) -> BTreeMap<String, String> {
251 let mut labels = self.metadata.labels.clone().unwrap_or_default();
252 labels.append(&mut self.run_label());
253 labels
254 }
255
256 fn sidecar_container<I>(sidecar_image: &str, http_port: u16, env: I) -> Container
257 where
258 I: Iterator<Item = (String, String)>,
259 {
260 let args = vec![
261 format!("--serve-port={}", SIDECAR_PORT),
262 format!("--upstream-port={}", http_port),
263 ];
264
265 let env: Vec<EnvVar> = env
266 .filter_map(|(key, val)| {
267 let name = key.strip_prefix("SPAWNER_SIDECAR_")?.to_string();
268 Some(EnvVar {
269 name,
270 value: Some(val.clone()),
271 ..EnvVar::default()
272 })
273 })
274 .collect();
275
276 let env = if env.is_empty() { None } else { Some(env) };
277
278 Container {
279 name: SIDECAR.to_string(),
280 image: Some(sidecar_image.to_string()),
281 args: Some(args),
282 env,
283 ..Container::default()
284 }
285 }
286
287 pub fn pod(
288 &self,
289 sidecar_image: &str,
290 image_pull_secret: &Option<String>,
291 ) -> Result<Pod, Error> {
292 let name = self.name();
293 let http_port = self.spec.http_port.unwrap_or(DEFAULT_PORT);
294
295 let mut template: PodSpec = self.spec.template.clone().into();
296 let port_env = EnvVar {
297 name: "PORT".to_string(),
298 value: Some(http_port.to_string()),
299 ..EnvVar::default()
300 };
301
302 let first_container = template
303 .containers
304 .first_mut()
305 .ok_or(Error::MissingObjectKey("template.containers[0]"))?;
306
307 match &mut first_container.env {
308 Some(env) => {
309 env.push(port_env);
310 }
311 None => first_container.env = Some(vec![port_env]),
312 };
313
314 template
315 .containers
316 .push(Self::sidecar_container(sidecar_image, http_port, std::env::vars()));
317
318 if let Some(image_pull_secret) = image_pull_secret {
319 let secret_ref = LocalObjectReference {
320 name: Some(image_pull_secret.to_string()),
321 };
322
323 template.image_pull_secrets = match template.image_pull_secrets {
324 Some(mut v) => {
325 v.push(secret_ref);
326 Some(v)
327 }
328 None => Some(vec![secret_ref]),
329 }
330 }
331
332 Ok(Pod {
333 metadata: ObjectMeta {
334 name: Some(name),
335 labels: Some(self.labels()),
336 owner_references: Some(vec![self.metadata_reference()?]),
337 ..ObjectMeta::default()
338 },
339 spec: Some(template),
340 ..Pod::default()
341 })
342 }
343
344 pub fn service(&self) -> Result<Service, Error> {
345 let name = self.name();
346
347 Ok(Service {
348 metadata: ObjectMeta {
349 name: Some(name.to_string()),
350 owner_references: Some(vec![self.metadata_reference()?]),
351 labels: Some(self.labels()),
352 ..ObjectMeta::default()
353 },
354 spec: Some(ServiceSpec {
355 selector: Some(self.run_label()),
356 ports: Some(vec![ServicePort {
357 name: Some(APPLICATION.to_string()),
358 protocol: Some("TCP".to_string()),
359 port: SIDECAR_PORT as i32,
360 ..ServicePort::default()
361 }]),
362 ..ServiceSpec::default()
363 }),
364 ..Service::default()
365 })
366 }
367
368 pub async fn set_spawner_group(
369 &self,
370 client: Client,
371 spawner_group: &str,
372 ) -> Result<(), Error> {
373 let namespace = self
374 .namespace()
375 .expect("SessionLivedBackend is a namespaced resource, but didn't have a namespace.");
376 let slab_api = Api::<SessionLivedBackend>::namespaced(client.clone(), &namespace);
377
378 slab_api
379 .patch(
380 &self.name(),
381 &PatchParams::default(),
382 &Patch::Merge(json!({
383 "metadata": {
384 "labels": {
385 "spawnerGroup": spawner_group
386 }
387 }
388 })),
389 )
390 .await
391 .map_err(Error::KubernetesFailure)?;
392
393 Ok(())
394 }
395
396 pub fn log_event(&self, state: SessionLivedBackendState) -> Event {
397 Event {
398 involved_object: self.object_ref(&()),
399 action: Some(state.to_string()),
400 message: Some(state.message()),
401 reason: Some(state.to_string()),
402 type_: Some("Normal".to_string()),
403 event_time: Some(MicroTime(Utc::now())),
404 reporting_component: Some(format!("{}/sessionlivedbackend", SPAWNER_GROUP)),
405 reporting_instance: Some(self.name()),
406 metadata: ObjectMeta {
407 namespace: self.namespace(),
408 generate_name: Some(format!("{}-", self.name())),
409 ..Default::default()
410 },
411
412 ..Default::default()
413 }
414 }
415
416 pub async fn update_state(
417 &self,
418 client: Client,
419 new_state: SessionLivedBackendState,
420 patch_status: SessionLivedBackendStatus,
421 ) -> Result<(), Error> {
422 let namespace = self
423 .namespace()
424 .expect("SessionLivedBackend is a namespaced resource, but didn't have a namespace.");
425 let slab_api = Api::<SessionLivedBackend>::namespaced(client.clone(), &namespace);
426 let event_api = Api::<Event>::namespaced(client.clone(), &namespace);
427
428 let new_status = SessionLivedBackendStatus::patch_state(new_state, patch_status);
429 tracing::info!(name=%self.name(), state=%new_state, "Updating SLAB state.");
430
431 slab_api
432 .patch_status(
433 &self.name(),
434 &PatchParams::default(),
435 &Patch::Merge(json!({ "status": new_status })),
436 )
437 .await
438 .map_err(Error::KubernetesFailure)?;
439
440 event_api
441 .create(&PostParams::default(), &self.log_event(new_state))
442 .await
443 .map_err(Error::KubernetesFailure)?;
444
445 Ok(())
446 }
447}
448
449#[cfg(test)]
450mod test {
451 use super::*;
452
453 #[test]
454 fn test_session_lived_backend_status() {
455 let mut backend = SessionLivedBackend {
456 spec: SessionLivedBackendSpec::default(),
457 metadata: ObjectMeta::default(),
458 status: None,
459 };
460 assert_eq!(SessionLivedBackendState::Submitted, backend.state());
461
462 backend.status = Some(SessionLivedBackendStatus::default());
463 assert_eq!(SessionLivedBackendState::Submitted, backend.state());
464
465 backend.status.as_mut().unwrap().constructed = Some(true);
466 assert_eq!(SessionLivedBackendState::Constructed, backend.state());
467
468 backend.status.as_mut().unwrap().scheduled = Some(true);
469 assert_eq!(SessionLivedBackendState::Scheduled, backend.state());
470
471 backend.status.as_mut().unwrap().running = Some(true);
472 assert_eq!(SessionLivedBackendState::Running, backend.state());
473
474 backend.status.as_mut().unwrap().ready = Some(true);
475 assert_eq!(SessionLivedBackendState::Ready, backend.state());
476 }
477
478 #[test]
479 fn test_labels() {
480 let backend = SessionLivedBackend {
481 spec: SessionLivedBackendSpec::default(),
482 metadata: ObjectMeta {
483 name: Some("slab1".to_string()),
484 namespace: Some("spawner".to_string()),
485 labels: Some(
486 vec![("foo".to_string(), "bar".to_string())]
487 .into_iter()
488 .collect(),
489 ),
490 ..ObjectMeta::default()
491 },
492 status: None,
493 };
494
495 let result: Vec<(String, String)> = backend.run_label().into_iter().collect();
496 assert_eq!(vec![("run".to_string(), "slab1".to_string())], result);
497
498 let result: Vec<(String, String)> = backend.labels().into_iter().collect();
499 assert_eq!(
500 vec![
501 ("foo".to_string(), "bar".to_string()),
502 ("run".to_string(), "slab1".to_string())
503 ],
504 result
505 );
506 }
507
508 #[test]
509 fn test_metadata_reference() {
510 let backend = SessionLivedBackend {
511 spec: SessionLivedBackendSpec::default(),
512 metadata: ObjectMeta {
513 name: Some("slab1".to_string()),
514 uid: Some("blah".to_string()),
515 ..ObjectMeta::default()
516 },
517 status: None,
518 };
519
520 let reference = backend.metadata_reference().unwrap();
521
522 assert_eq!(
523 OwnerReference {
524 api_version: "spawner.dev/v1".to_string(),
525 block_owner_deletion: None,
526 controller: Some(true),
527 kind: "SessionLivedBackend".to_string(),
528 name: "slab1".to_string(),
529 uid: "blah".to_string()
530 },
531 reference
532 );
533 }
534
535 #[test]
536 fn test_event() {
537 let backend = SessionLivedBackend {
538 spec: SessionLivedBackendSpec::default(),
539 metadata: ObjectMeta {
540 namespace: Some("blah".to_string()),
541 name: Some("slab1".to_string()),
542 ..ObjectMeta::default()
543 },
544 status: None,
545 };
546
547 let event = backend.log_event(SessionLivedBackendState::Running);
548 let event_time = event.event_time.clone();
549
550 assert_eq!(
551 Event {
552 action: Some("Running".to_string()),
553 event_time,
554 involved_object: backend.object_ref(&()),
555 message: Some("Pod was observed running.".to_string()),
556 metadata: ObjectMeta {
557 generate_name: Some("slab1-".to_string()),
558 namespace: Some("blah".to_string()),
559 ..ObjectMeta::default()
560 },
561 reason: Some("Running".to_string()),
562 reporting_component: Some("spawner.dev/sessionlivedbackend".to_string()),
563 reporting_instance: Some("slab1".to_string()),
564 type_: Some("Normal".to_string()),
565 ..Event::default()
566 },
567 event
568 );
569 }
570
571 #[test]
572 fn test_patch_state() {
573 assert_eq!(
574 SessionLivedBackendStatus {
575 submitted: Some(true),
576 ..SessionLivedBackendStatus::default()
577 },
578 SessionLivedBackendStatus::patch_state(
579 SessionLivedBackendState::Submitted,
580 SessionLivedBackendStatus::default()
581 )
582 );
583
584 assert_eq!(
585 SessionLivedBackendStatus {
586 constructed: Some(true),
587 ..SessionLivedBackendStatus::default()
588 },
589 SessionLivedBackendStatus::patch_state(
590 SessionLivedBackendState::Constructed,
591 SessionLivedBackendStatus::default()
592 )
593 );
594
595 assert_eq!(
596 SessionLivedBackendStatus {
597 scheduled: Some(true),
598 ip: Some("1.1.1.1".to_string()),
599 url: Some("url".to_string()),
600 port: Some(8080),
601 node_name: Some("node1".to_string()),
602 ..SessionLivedBackendStatus::default()
603 },
604 SessionLivedBackendStatus::patch_state(
605 SessionLivedBackendState::Scheduled,
606 SessionLivedBackendStatus {
607 ip: Some("1.1.1.1".to_string()),
608 url: Some("url".to_string()),
609 port: Some(8080),
610 node_name: Some("node1".to_string()),
611 ..SessionLivedBackendStatus::default()
612 }
613 )
614 );
615
616 assert_eq!(
617 SessionLivedBackendStatus {
618 running: Some(true),
619 ..SessionLivedBackendStatus::default()
620 },
621 SessionLivedBackendStatus::patch_state(
622 SessionLivedBackendState::Running,
623 SessionLivedBackendStatus::default()
624 )
625 );
626
627 assert_eq!(
628 SessionLivedBackendStatus {
629 ready: Some(true),
630 ..SessionLivedBackendStatus::default()
631 },
632 SessionLivedBackendStatus::patch_state(
633 SessionLivedBackendState::Ready,
634 SessionLivedBackendStatus::default()
635 )
636 );
637 }
638
639 #[test]
640 fn test_sidecar_env_var() {
641 let env = vec![
642 ("SPAWNER_SIDECAR_FOO".to_string(), "FOO_VAL".to_string()),
643 ("SPAWNER_SIDECAR_BAR".to_string(), "BAR_VAL".to_string()),
644 ];
645
646 let sidecar_container = SessionLivedBackend::sidecar_container("sidecar-image", 4040, env.into_iter());
647
648 assert_eq!(
649 Container {
650 name: "spawner-sidecar".to_string(),
651 image: Some("sidecar-image".to_string()),
652 args: Some(vec![
653 "--serve-port=9090".to_string(),
654 "--upstream-port=4040".to_string()
655 ]),
656 env: Some(vec![
657 EnvVar {
658 name: "FOO".to_string(),
659 value: Some("FOO_VAL".to_string()),
660 ..EnvVar::default()
661 },
662 EnvVar {
663 name: "BAR".to_string(),
664 value: Some("BAR_VAL".to_string()),
665 ..EnvVar::default()
666 },
667 ]),
668 ..Container::default()
669 },
670 sidecar_container
671 );
672 }
673
674 #[test]
675 fn test_pod_and_service() {
676 let backend = SessionLivedBackend {
677 spec: SessionLivedBackendSpec {
678 template: BackendPodSpec {
679 containers: vec![Container {
680 name: "foo".to_string(),
681 env: Some(vec![EnvVar {
682 name: "FOO".to_string(),
683 value: Some("BAR".to_string()),
684 ..Default::default()
685 }]),
686 ..Default::default()
687 }],
688 ..BackendPodSpec::default()
689 },
690 grace_period_seconds: Some(400),
691 http_port: Some(4040),
692 },
693 metadata: ObjectMeta {
694 name: Some("slab1".to_string()),
695 namespace: Some("ns1".to_string()),
696 uid: Some("uid1".to_string()),
697 ..ObjectMeta::default()
698 },
699 status: None,
700 };
701
702 assert_eq!(
703 Pod {
704 metadata: ObjectMeta {
705 name: Some("slab1".to_string()),
706 owner_references: Some(vec![backend.metadata_reference().unwrap()]),
707 labels: Some(
708 vec![("run".to_string(), "slab1".to_string())]
709 .into_iter()
710 .collect()
711 ),
712 ..ObjectMeta::default()
713 },
714 spec: Some(PodSpec {
715 restart_policy: Some("Never".to_string()),
716 containers: vec![
717 Container {
718 name: "foo".to_string(),
719 env: Some(vec![
720 EnvVar {
721 name: "FOO".to_string(),
722 value: Some("BAR".to_string()),
723 ..Default::default()
724 },
725 EnvVar {
726 name: "PORT".to_string(),
727 value: Some("4040".to_string()),
728 ..Default::default()
729 },
730 ]),
731 ..Default::default()
732 },
733 Container {
734 name: "spawner-sidecar".to_string(),
735 image: Some("sidecar-image".to_string()),
736 args: Some(vec![
737 "--serve-port=9090".to_string(),
738 "--upstream-port=4040".to_string()
739 ]),
740 ..Container::default()
741 },
742 ],
743 image_pull_secrets: Some(vec![LocalObjectReference {
744 name: Some("my-secret".to_string())
745 }]),
746 ..PodSpec::default()
747 }),
748 ..Default::default()
749 },
750 backend
751 .pod("sidecar-image", &Some("my-secret".to_string()))
752 .unwrap()
753 );
754
755 assert_eq!(
756 Service {
757 metadata: ObjectMeta {
758 name: Some("slab1".to_string()),
759 owner_references: Some(vec![backend.metadata_reference().unwrap()]),
760 labels: Some(
761 vec![("run".to_string(), "slab1".to_string())]
762 .into_iter()
763 .collect()
764 ),
765 ..ObjectMeta::default()
766 },
767 spec: Some(ServiceSpec {
768 ports: Some(vec![ServicePort {
769 name: Some("spawner-app".to_string()),
770 port: 9090,
771 protocol: Some("TCP".to_string()),
772 ..ServicePort::default()
773 }]),
774 selector: Some(
775 vec![("run".to_string(), "slab1".to_string())]
776 .into_iter()
777 .collect()
778 ),
779 ..ServiceSpec::default()
780 }),
781 ..Service::default()
782 },
783 backend.service().unwrap()
784 );
785 }
786}