1use crate::error::{ErrorData, Result};
2use crate::resource::{ResourceDefinition, ResourceOutputsDefinition, ResourceRef, ResourceType};
3use crate::LoadBalancerEndpoint;
4use alien_error::AlienError;
5use bon::Builder;
6use serde::{Deserialize, Serialize};
7use std::any::Any;
8use std::collections::HashMap;
9use std::fmt::Debug;
10
11#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
13#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
14#[serde(rename_all = "camelCase")]
15pub enum Ingress {
16 Public,
18 Private,
20}
21
22#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
25#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
26#[serde(rename_all = "camelCase", tag = "type")]
27pub enum WorkerCode {
28 #[serde(rename_all = "camelCase")]
30 Image {
31 image: String,
33 },
34 #[serde(rename_all = "camelCase")]
36 Source {
37 src: String,
39 toolchain: ToolchainConfig,
41 },
42}
43
44#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
47#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
48#[serde(rename_all = "lowercase", tag = "type")]
49pub enum ToolchainConfig {
50 #[serde(rename_all = "camelCase")]
52 Rust {
53 binary_name: String,
55 },
56 #[serde(rename_all = "camelCase")]
58 TypeScript {
59 #[serde(default, skip_serializing_if = "Option::is_none")]
61 binary_name: Option<String>,
62 },
63 #[serde(rename_all = "camelCase")]
65 Docker {
66 #[serde(skip_serializing_if = "Option::is_none")]
68 dockerfile: Option<String>,
69 #[serde(skip_serializing_if = "Option::is_none")]
71 build_args: Option<HashMap<String, String>>,
72 #[serde(skip_serializing_if = "Option::is_none")]
74 target: Option<String>,
75 },
76}
77
78#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
80#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
81#[serde(tag = "type", rename_all = "camelCase")]
82pub enum WorkerTrigger {
83 Queue {
85 queue: ResourceRef,
87 },
88 Storage {
90 storage: ResourceRef,
92 events: Vec<String>,
94 },
95 Schedule {
97 cron: String,
99 },
100}
101
102impl WorkerTrigger {
103 pub fn queue<R: ?Sized>(queue: &R) -> Self
107 where
108 for<'a> &'a R: Into<ResourceRef>,
109 {
110 let queue_ref: ResourceRef = queue.into();
111 WorkerTrigger::Queue { queue: queue_ref }
112 }
113
114 pub fn storage<R: ?Sized>(storage: &R, events: Vec<String>) -> Self
117 where
118 for<'a> &'a R: Into<ResourceRef>,
119 {
120 let storage_ref: ResourceRef = storage.into();
121 WorkerTrigger::Storage {
122 storage: storage_ref,
123 events,
124 }
125 }
126
127 pub fn schedule<S: Into<String>>(cron: S) -> Self {
130 WorkerTrigger::Schedule { cron: cron.into() }
131 }
132}
133
134#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Builder)]
137#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
138#[serde(rename_all = "camelCase", deny_unknown_fields)]
139#[builder(start_fn = new)]
140pub struct Worker {
141 #[builder(start_fn)]
144 pub id: String,
145
146 #[builder(field)]
149 pub links: Vec<ResourceRef>,
150
151 #[builder(field)]
155 pub triggers: Vec<WorkerTrigger>,
156
157 pub permissions: String,
160
161 pub code: WorkerCode,
163
164 #[builder(default = default_memory_mb())]
173 #[serde(default = "default_memory_mb")]
174 #[cfg_attr(feature = "openapi", schema(default = default_memory_mb))]
175 pub memory_mb: u32,
176
177 #[builder(default = default_timeout_seconds())]
181 #[serde(default = "default_timeout_seconds")]
182 #[cfg_attr(feature = "openapi", schema(default = default_timeout_seconds))]
183 pub timeout_seconds: u32,
184
185 #[builder(default)]
187 #[serde(default)]
188 pub environment: HashMap<String, String>,
189
190 #[builder(default = default_ingress())]
192 #[serde(default = "default_ingress")]
193 #[cfg_attr(feature = "openapi", schema(default = default_ingress))]
194 pub ingress: Ingress,
195
196 #[builder(default = default_commands_enabled())]
199 #[serde(default = "default_commands_enabled")]
200 #[cfg_attr(feature = "openapi", schema(default = default_commands_enabled))]
201 pub commands_enabled: bool,
202
203 pub concurrency_limit: Option<u32>,
206
207 pub readiness_probe: Option<ReadinessProbe>,
211}
212
213impl Worker {
214 pub const RESOURCE_TYPE: ResourceType = ResourceType::from_static("worker");
216
217 pub fn get_permissions(&self) -> &str {
219 &self.permissions
220 }
221}
222
223fn default_memory_mb() -> u32 {
224 256
225}
226
227fn default_timeout_seconds() -> u32 {
228 180
229}
230
231fn default_ingress() -> Ingress {
232 Ingress::Private
233}
234
235fn default_commands_enabled() -> bool {
236 false
237}
238
239#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
241#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
242#[serde(rename_all = "UPPERCASE")]
243#[derive(Default)]
244pub enum HttpMethod {
245 #[default]
246 Get,
247 Post,
248 Put,
249 Delete,
250 Head,
251 Options,
252 Patch,
253}
254
255#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
259#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
260#[serde(rename_all = "camelCase")]
261pub struct ReadinessProbe {
262 #[serde(default)]
265 pub method: HttpMethod,
266
267 #[serde(default = "default_probe_path")]
270 pub path: String,
271}
272
273fn default_probe_path() -> String {
274 "/".to_string()
275}
276
277impl Default for ReadinessProbe {
278 fn default() -> Self {
279 Self {
280 method: HttpMethod::default(),
281 path: default_probe_path(),
282 }
283 }
284}
285
286use crate::resources::worker::worker_builder::State;
287
288impl<S: State> WorkerBuilder<S> {
289 pub fn link<R: ?Sized>(mut self, resource: &R) -> Self
292 where
293 for<'a> &'a R: Into<ResourceRef>, {
295 let resource_ref: ResourceRef = resource.into();
297 self.links.push(resource_ref);
298 self
299 }
300
301 pub fn trigger(mut self, trigger: WorkerTrigger) -> Self {
317 self.triggers.push(trigger);
318 self
319 }
320}
321
322impl ResourceDefinition for Worker {
324 fn get_resource_type(&self) -> ResourceType {
325 Self::RESOURCE_TYPE
326 }
327
328 fn id(&self) -> &str {
329 &self.id
330 }
331
332 fn get_dependencies(&self) -> Vec<ResourceRef> {
333 let mut dependencies = self.links.clone();
334
335 for trigger in &self.triggers {
337 match trigger {
338 WorkerTrigger::Queue { queue } => {
339 dependencies.push(queue.clone());
340 }
341 WorkerTrigger::Storage { storage, .. } => {
342 dependencies.push(storage.clone());
343 }
344 WorkerTrigger::Schedule { .. } => {
345 }
347 }
348 }
349
350 dependencies
351 }
352
353 fn get_permissions(&self) -> Option<&str> {
354 Some(&self.permissions)
355 }
356
357 fn validate_update(&self, new_config: &dyn ResourceDefinition) -> Result<()> {
358 let new_worker = new_config
360 .as_any()
361 .downcast_ref::<Worker>()
362 .ok_or_else(|| {
363 AlienError::new(ErrorData::UnexpectedResourceType {
364 resource_id: self.id.clone(),
365 expected: Self::RESOURCE_TYPE,
366 actual: new_config.get_resource_type(),
367 })
368 })?;
369
370 if self.id != new_worker.id {
371 return Err(AlienError::new(ErrorData::InvalidResourceUpdate {
372 resource_id: self.id.clone(),
373 reason: "the 'id' field is immutable".to_string(),
374 }));
375 }
376 Ok(())
377 }
378
379 fn as_any(&self) -> &dyn Any {
380 self
381 }
382
383 fn as_any_mut(&mut self) -> &mut dyn Any {
384 self
385 }
386
387 fn box_clone(&self) -> Box<dyn ResourceDefinition> {
388 Box::new(self.clone())
389 }
390
391 fn resource_eq(&self, other: &dyn ResourceDefinition) -> bool {
392 other.as_any().downcast_ref::<Worker>() == Some(self)
393 }
394
395 fn to_json_value(&self) -> serde_json::Result<serde_json::Value> {
396 serde_json::to_value(self)
397 }
398}
399
400#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
402#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
403#[serde(rename_all = "camelCase")]
404pub struct WorkerOutputs {
405 pub worker_name: String,
407 #[serde(skip_serializing_if = "Option::is_none")]
409 pub url: Option<String>,
410 #[serde(skip_serializing_if = "Option::is_none")]
412 pub identifier: Option<String>,
413 #[serde(skip_serializing_if = "Option::is_none")]
416 pub load_balancer_endpoint: Option<LoadBalancerEndpoint>,
417 #[serde(default, skip_serializing_if = "Option::is_none")]
422 pub commands_push_target: Option<String>,
423}
424
425impl ResourceOutputsDefinition for WorkerOutputs {
426 fn get_resource_type(&self) -> ResourceType {
427 Worker::RESOURCE_TYPE.clone()
428 }
429
430 fn as_any(&self) -> &dyn Any {
431 self
432 }
433
434 fn box_clone(&self) -> Box<dyn ResourceOutputsDefinition> {
435 Box::new(self.clone())
436 }
437
438 fn outputs_eq(&self, other: &dyn ResourceOutputsDefinition) -> bool {
439 other.as_any().downcast_ref::<WorkerOutputs>() == Some(self)
440 }
441
442 fn to_json_value(&self) -> serde_json::Result<serde_json::Value> {
443 serde_json::to_value(self)
444 }
445}
446
447#[cfg(test)]
448mod tests {
449 use super::*;
450 use crate::Storage;
451
452 #[test]
453 fn test_worker_builder_direct_refs() {
454 let dummy_storage = Storage::new("test-storage".to_string()).build();
455 let dummy_storage_2 = Storage::new("test-storage-2".to_string()).build();
456
457 let worker = Worker::new("my-worker".to_string())
458 .code(WorkerCode::Image {
459 image: "test-image".to_string(),
460 })
461 .permissions("execution".to_string())
462 .link(&dummy_storage) .link(&dummy_storage_2) .build();
465
466 assert_eq!(worker.id, "my-worker");
467 assert_eq!(
468 worker.code,
469 WorkerCode::Image {
470 image: "test-image".to_string()
471 }
472 );
473
474 assert_eq!(worker.permissions, "execution");
476
477 assert!(worker
479 .links
480 .contains(&ResourceRef::new(Storage::RESOURCE_TYPE, "test-storage")));
481 assert!(worker
482 .links
483 .contains(&ResourceRef::new(Storage::RESOURCE_TYPE, "test-storage-2")));
484 assert_eq!(worker.links.len(), 2); }
486
487 #[test]
488 fn test_worker_with_readiness_probe() {
489 let probe = ReadinessProbe {
490 method: HttpMethod::Post,
491 path: "/health".to_string(),
492 };
493
494 let worker = Worker::new("my-worker".to_string())
495 .code(WorkerCode::Image {
496 image: "test-image".to_string(),
497 })
498 .permissions("execution".to_string())
499 .ingress(Ingress::Public)
500 .readiness_probe(probe.clone())
501 .build();
502
503 assert_eq!(worker.id, "my-worker");
504 assert_eq!(worker.ingress, Ingress::Public);
505 assert_eq!(worker.readiness_probe, Some(probe));
506 }
507
508 #[test]
509 fn test_readiness_probe_defaults() {
510 let probe = ReadinessProbe::default();
511 assert_eq!(probe.method, HttpMethod::Get);
512 assert_eq!(probe.path, "/");
513 }
514
515 #[test]
516 fn test_worker_with_rust_toolchain() {
517 let worker = Worker::new("my-rust-worker".to_string())
518 .code(WorkerCode::Source {
519 src: "./".to_string(),
520 toolchain: ToolchainConfig::Rust {
521 binary_name: "my-app".to_string(),
522 },
523 })
524 .permissions("execution".to_string())
525 .build();
526
527 assert_eq!(worker.id, "my-rust-worker");
528
529 match &worker.code {
530 WorkerCode::Source { src, toolchain } => {
531 assert_eq!(src, "./");
532 assert_eq!(
533 toolchain,
534 &ToolchainConfig::Rust {
535 binary_name: "my-app".to_string(),
536 }
537 );
538 }
539 _ => panic!("Expected Source code"),
540 }
541 }
542
543 #[test]
544 fn test_worker_with_typescript_toolchain() {
545 let worker = Worker::new("my-ts-worker".to_string())
546 .code(WorkerCode::Source {
547 src: "./".to_string(),
548 toolchain: ToolchainConfig::TypeScript {
549 binary_name: Some("my-ts-worker".to_string()),
550 },
551 })
552 .permissions("execution".to_string())
553 .build();
554
555 assert_eq!(worker.id, "my-ts-worker");
556
557 match &worker.code {
558 WorkerCode::Source { src, toolchain } => {
559 assert_eq!(src, "./");
560 assert_eq!(
561 toolchain,
562 &ToolchainConfig::TypeScript {
563 binary_name: Some("my-ts-worker".to_string())
564 }
565 );
566 }
567 _ => panic!("Expected Source code"),
568 }
569 }
570
571 #[test]
572 fn test_worker_with_queue_trigger() {
573 use crate::Queue;
574
575 let queue = Queue::new("test-queue".to_string()).build();
576
577 let worker = Worker::new("triggered-worker".to_string())
578 .code(WorkerCode::Image {
579 image: "test-image".to_string(),
580 })
581 .permissions("execution".to_string())
582 .trigger(WorkerTrigger::queue(&queue))
583 .build();
584
585 assert_eq!(worker.triggers.len(), 1);
586 if let WorkerTrigger::Queue { queue: queue_ref } = &worker.triggers[0] {
587 assert_eq!(queue_ref.resource_type, Queue::RESOURCE_TYPE);
588 assert_eq!(queue_ref.id, "test-queue");
589 } else {
590 panic!("Expected queue trigger");
591 }
592 }
593
594 #[test]
595 fn test_worker_trigger_dependencies() {
596 use crate::Queue;
597
598 let queue = Queue::new("test-queue".to_string()).build();
599 let storage = Storage::new("test-storage".to_string()).build();
600
601 let worker = Worker::new("triggered-worker".to_string())
602 .code(WorkerCode::Image {
603 image: "test-image".to_string(),
604 })
605 .permissions("execution".to_string())
606 .link(&storage) .trigger(WorkerTrigger::queue(&queue)) .build();
609
610 let dependencies = worker.get_dependencies();
611
612 assert_eq!(dependencies.len(), 2);
614 assert!(dependencies.contains(&ResourceRef::new(Storage::RESOURCE_TYPE, "test-storage")));
615 assert!(dependencies.contains(&ResourceRef::new(Queue::RESOURCE_TYPE, "test-queue")));
616 }
617
618 #[test]
619 fn test_worker_trigger_helper_methods() {
620 use crate::Queue;
621
622 let queue = Queue::new("my-queue".to_string()).build();
623
624 let trigger = WorkerTrigger::queue(&queue);
626
627 if let WorkerTrigger::Queue { queue: queue_ref } = trigger {
628 assert_eq!(queue_ref.resource_type, Queue::RESOURCE_TYPE);
629 assert_eq!(queue_ref.id, "my-queue");
630 } else {
631 panic!("Expected queue trigger");
632 }
633 }
634
635 #[test]
636 fn test_worker_with_multiple_triggers() {
637 use crate::Queue;
638
639 let queue1 = Queue::new("queue-1".to_string()).build();
640 let queue2 = Queue::new("queue-2".to_string()).build();
641
642 let worker = Worker::new("multi-triggered-worker".to_string())
643 .code(WorkerCode::Image {
644 image: "test-image".to_string(),
645 })
646 .permissions("execution".to_string())
647 .trigger(WorkerTrigger::queue(&queue1))
648 .trigger(WorkerTrigger::queue(&queue2))
649 .trigger(WorkerTrigger::schedule("0 * * * *".to_string()))
650 .build();
651
652 assert_eq!(worker.triggers.len(), 3);
653
654 if let WorkerTrigger::Queue { queue: queue_ref } = &worker.triggers[0] {
656 assert_eq!(queue_ref.id, "queue-1");
657 } else {
658 panic!("Expected first trigger to be queue-1");
659 }
660
661 if let WorkerTrigger::Queue { queue: queue_ref } = &worker.triggers[1] {
663 assert_eq!(queue_ref.id, "queue-2");
664 } else {
665 panic!("Expected second trigger to be queue-2");
666 }
667
668 if let WorkerTrigger::Schedule { cron } = &worker.triggers[2] {
670 assert_eq!(cron, "0 * * * *");
671 } else {
672 panic!("Expected third trigger to be schedule");
673 }
674
675 let dependencies = worker.get_dependencies();
677 assert_eq!(dependencies.len(), 2); assert!(dependencies.contains(&ResourceRef::new(Queue::RESOURCE_TYPE, "queue-1")));
679 assert!(dependencies.contains(&ResourceRef::new(Queue::RESOURCE_TYPE, "queue-2")));
680 }
681
682 #[test]
683 fn test_worker_with_commands_enabled() {
684 let worker = Worker::new("cmd-worker".to_string())
685 .code(WorkerCode::Image {
686 image: "test-image".to_string(),
687 })
688 .permissions("execution".to_string())
689 .ingress(Ingress::Private)
690 .commands_enabled(true)
691 .build();
692
693 assert_eq!(worker.id, "cmd-worker");
694 assert_eq!(worker.ingress, Ingress::Private);
695 assert_eq!(worker.commands_enabled, true);
696 }
697
698 #[test]
699 fn test_worker_defaults() {
700 let worker = Worker::new("default-worker".to_string())
701 .code(WorkerCode::Image {
702 image: "test-image".to_string(),
703 })
704 .permissions("execution".to_string())
705 .build();
706
707 assert_eq!(worker.ingress, Ingress::Private);
709 assert_eq!(worker.commands_enabled, false);
710 assert_eq!(worker.memory_mb, 256);
711 assert_eq!(worker.timeout_seconds, 180);
712 }
713
714 #[test]
715 fn test_worker_public_ingress_with_commands() {
716 let worker = Worker::new("public-cmd-worker".to_string())
717 .code(WorkerCode::Image {
718 image: "test-image".to_string(),
719 })
720 .permissions("execution".to_string())
721 .ingress(Ingress::Public)
722 .commands_enabled(true)
723 .build();
724
725 assert_eq!(worker.ingress, Ingress::Public);
726 assert_eq!(worker.commands_enabled, true);
727 }
728}