1use crate::error::{ErrorData, Result};
2use crate::resource::{ResourceDefinition, ResourceOutputsDefinition, ResourceRef, ResourceType};
3use crate::{PublicEndpointOutput, WorkerPublicEndpoint};
4use alien_error::AlienError;
5use bon::Builder;
6use serde::{Deserialize, Serialize};
7use std::any::Any;
8use std::collections::HashMap;
9use std::fmt::Debug;
10
11#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
14#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
15#[serde(rename_all = "camelCase", tag = "type")]
16pub enum WorkerCode {
17 #[serde(rename_all = "camelCase")]
19 Image {
20 image: String,
22 },
23 #[serde(rename_all = "camelCase")]
25 Source {
26 src: String,
28 toolchain: ToolchainConfig,
30 },
31}
32
33#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
36#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
37#[serde(rename_all = "lowercase", tag = "type")]
38pub enum ToolchainConfig {
39 #[serde(rename_all = "camelCase")]
41 Rust {
42 binary_name: String,
44 },
45 #[serde(rename_all = "camelCase")]
47 TypeScript {
48 #[serde(default, skip_serializing_if = "Option::is_none")]
50 binary_name: Option<String>,
51 },
52 #[serde(rename_all = "camelCase")]
54 Docker {
55 #[serde(skip_serializing_if = "Option::is_none")]
57 dockerfile: Option<String>,
58 #[serde(skip_serializing_if = "Option::is_none")]
60 build_args: Option<HashMap<String, String>>,
61 #[serde(skip_serializing_if = "Option::is_none")]
63 target: Option<String>,
64 },
65}
66
67#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
69#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
70#[serde(tag = "type", rename_all = "camelCase")]
71pub enum WorkerTrigger {
72 Queue {
74 queue: ResourceRef,
76 },
77 Storage {
79 storage: ResourceRef,
81 events: Vec<String>,
83 },
84 Schedule {
86 cron: String,
88 },
89}
90
91impl WorkerTrigger {
92 pub fn queue<R: ?Sized>(queue: &R) -> Self
96 where
97 for<'a> &'a R: Into<ResourceRef>,
98 {
99 let queue_ref: ResourceRef = queue.into();
100 WorkerTrigger::Queue { queue: queue_ref }
101 }
102
103 pub fn storage<R: ?Sized>(storage: &R, events: Vec<String>) -> Self
106 where
107 for<'a> &'a R: Into<ResourceRef>,
108 {
109 let storage_ref: ResourceRef = storage.into();
110 WorkerTrigger::Storage {
111 storage: storage_ref,
112 events,
113 }
114 }
115
116 pub fn schedule<S: Into<String>>(cron: S) -> Self {
119 WorkerTrigger::Schedule { cron: cron.into() }
120 }
121}
122
123#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Builder)]
126#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
127#[serde(rename_all = "camelCase", deny_unknown_fields)]
128#[builder(start_fn = new)]
129pub struct Worker {
130 #[builder(start_fn)]
133 pub id: String,
134
135 #[builder(field)]
138 pub links: Vec<ResourceRef>,
139
140 #[builder(field)]
144 pub triggers: Vec<WorkerTrigger>,
145
146 #[builder(field)]
148 #[serde(default, skip_serializing_if = "Vec::is_empty")]
149 pub public_endpoints: Vec<WorkerPublicEndpoint>,
150
151 pub permissions: String,
154
155 pub code: WorkerCode,
157
158 #[builder(default = default_memory_mb())]
167 #[serde(default = "default_memory_mb")]
168 #[cfg_attr(feature = "openapi", schema(default = default_memory_mb))]
169 pub memory_mb: u32,
170
171 #[builder(default = default_timeout_seconds())]
175 #[serde(default = "default_timeout_seconds")]
176 #[cfg_attr(feature = "openapi", schema(default = default_timeout_seconds))]
177 pub timeout_seconds: u32,
178
179 #[builder(default)]
181 #[serde(default)]
182 pub environment: HashMap<String, String>,
183
184 #[builder(default = default_commands_enabled())]
187 #[serde(default = "default_commands_enabled")]
188 #[cfg_attr(feature = "openapi", schema(default = default_commands_enabled))]
189 pub commands_enabled: bool,
190
191 pub concurrency_limit: Option<u32>,
194
195 pub readiness_probe: Option<ReadinessProbe>,
199}
200
201impl Worker {
202 pub const RESOURCE_TYPE: ResourceType = ResourceType::from_static("worker");
204
205 pub fn get_permissions(&self) -> &str {
207 &self.permissions
208 }
209
210 fn validate_public_endpoints(&self) -> Result<()> {
211 let mut endpoint_names = std::collections::HashSet::new();
212 for endpoint in &self.public_endpoints {
213 endpoint.validate_for_resource(&self.id)?;
214 if !endpoint_names.insert(endpoint.name.as_str()) {
215 return Err(AlienError::new(ErrorData::InvalidResourceUpdate {
216 resource_id: self.id.clone(),
217 reason: format!("duplicate public endpoint name '{}'", endpoint.name),
218 }));
219 }
220 }
221
222 Ok(())
223 }
224}
225
226fn default_memory_mb() -> u32 {
227 256
228}
229
230fn default_timeout_seconds() -> u32 {
231 180
232}
233
234fn default_commands_enabled() -> bool {
235 false
236}
237
238#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
240#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
241#[serde(rename_all = "UPPERCASE")]
242#[derive(Default)]
243pub enum HttpMethod {
244 #[default]
245 Get,
246 Post,
247 Put,
248 Delete,
249 Head,
250 Options,
251 Patch,
252}
253
254#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
258#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
259#[serde(rename_all = "camelCase")]
260pub struct ReadinessProbe {
261 #[serde(default)]
264 pub method: HttpMethod,
265
266 #[serde(default = "default_probe_path")]
269 pub path: String,
270}
271
272fn default_probe_path() -> String {
273 "/".to_string()
274}
275
276impl Default for ReadinessProbe {
277 fn default() -> Self {
278 Self {
279 method: HttpMethod::default(),
280 path: default_probe_path(),
281 }
282 }
283}
284
285use crate::resources::worker::worker_builder::State;
286
287impl<S: State> WorkerBuilder<S> {
288 pub fn link<R: ?Sized>(mut self, resource: &R) -> Self
291 where
292 for<'a> &'a R: Into<ResourceRef>, {
294 let resource_ref: ResourceRef = resource.into();
296 self.links.push(resource_ref);
297 self
298 }
299
300 pub fn trigger(mut self, trigger: WorkerTrigger) -> Self {
316 self.triggers.push(trigger);
317 self
318 }
319
320 pub fn public_endpoint(mut self, endpoint: WorkerPublicEndpoint) -> Self {
322 self.public_endpoints.push(endpoint);
323 self
324 }
325}
326
327impl ResourceDefinition for Worker {
329 fn get_resource_type(&self) -> ResourceType {
330 Self::RESOURCE_TYPE
331 }
332
333 fn id(&self) -> &str {
334 &self.id
335 }
336
337 fn get_dependencies(&self) -> Vec<ResourceRef> {
338 let mut dependencies = self.links.clone();
339
340 for trigger in &self.triggers {
342 match trigger {
343 WorkerTrigger::Queue { queue } => {
344 dependencies.push(queue.clone());
345 }
346 WorkerTrigger::Storage { storage, .. } => {
347 dependencies.push(storage.clone());
348 }
349 WorkerTrigger::Schedule { .. } => {
350 }
352 }
353 }
354
355 dependencies
356 }
357
358 fn get_permissions(&self) -> Option<&str> {
359 Some(&self.permissions)
360 }
361
362 fn validate_update(&self, new_config: &dyn ResourceDefinition) -> Result<()> {
363 let new_worker = new_config
365 .as_any()
366 .downcast_ref::<Worker>()
367 .ok_or_else(|| {
368 AlienError::new(ErrorData::UnexpectedResourceType {
369 resource_id: self.id.clone(),
370 expected: Self::RESOURCE_TYPE,
371 actual: new_config.get_resource_type(),
372 })
373 })?;
374
375 if self.id != new_worker.id {
376 return Err(AlienError::new(ErrorData::InvalidResourceUpdate {
377 resource_id: self.id.clone(),
378 reason: "the 'id' field is immutable".to_string(),
379 }));
380 }
381 self.validate_public_endpoints()?;
382 new_worker.validate_public_endpoints()?;
383 if self.public_endpoints != new_worker.public_endpoints {
384 return Err(AlienError::new(ErrorData::InvalidResourceUpdate {
385 resource_id: self.id.clone(),
386 reason: "the 'publicEndpoints' field is immutable".to_string(),
387 }));
388 }
389 Ok(())
390 }
391
392 fn as_any(&self) -> &dyn Any {
393 self
394 }
395
396 fn as_any_mut(&mut self) -> &mut dyn Any {
397 self
398 }
399
400 fn box_clone(&self) -> Box<dyn ResourceDefinition> {
401 Box::new(self.clone())
402 }
403
404 fn resource_eq(&self, other: &dyn ResourceDefinition) -> bool {
405 other.as_any().downcast_ref::<Worker>() == Some(self)
406 }
407
408 fn to_json_value(&self) -> serde_json::Result<serde_json::Value> {
409 serde_json::to_value(self)
410 }
411}
412
413#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
415#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
416#[serde(rename_all = "camelCase")]
417pub struct WorkerOutputs {
418 pub worker_name: String,
420 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
422 pub public_endpoints: HashMap<String, PublicEndpointOutput>,
423 #[serde(skip_serializing_if = "Option::is_none")]
425 pub identifier: Option<String>,
426 #[serde(default, skip_serializing_if = "Option::is_none")]
431 pub commands_push_target: Option<String>,
432}
433
434impl ResourceOutputsDefinition for WorkerOutputs {
435 fn get_resource_type(&self) -> ResourceType {
436 Worker::RESOURCE_TYPE.clone()
437 }
438
439 fn as_any(&self) -> &dyn Any {
440 self
441 }
442
443 fn box_clone(&self) -> Box<dyn ResourceOutputsDefinition> {
444 Box::new(self.clone())
445 }
446
447 fn outputs_eq(&self, other: &dyn ResourceOutputsDefinition) -> bool {
448 other.as_any().downcast_ref::<WorkerOutputs>() == Some(self)
449 }
450
451 fn to_json_value(&self) -> serde_json::Result<serde_json::Value> {
452 serde_json::to_value(self)
453 }
454}
455
456#[cfg(test)]
457mod tests {
458 use super::*;
459 use crate::Storage;
460
461 #[test]
462 fn test_worker_builder_direct_refs() {
463 let dummy_storage = Storage::new("test-storage".to_string()).build();
464 let dummy_storage_2 = Storage::new("test-storage-2".to_string()).build();
465
466 let worker = Worker::new("my-worker".to_string())
467 .code(WorkerCode::Image {
468 image: "test-image".to_string(),
469 })
470 .permissions("execution".to_string())
471 .link(&dummy_storage) .link(&dummy_storage_2) .build();
474
475 assert_eq!(worker.id, "my-worker");
476 assert_eq!(
477 worker.code,
478 WorkerCode::Image {
479 image: "test-image".to_string()
480 }
481 );
482
483 assert_eq!(worker.permissions, "execution");
485
486 assert!(worker
488 .links
489 .contains(&ResourceRef::new(Storage::RESOURCE_TYPE, "test-storage")));
490 assert!(worker
491 .links
492 .contains(&ResourceRef::new(Storage::RESOURCE_TYPE, "test-storage-2")));
493 assert_eq!(worker.links.len(), 2); }
495
496 #[test]
497 fn test_worker_with_readiness_probe() {
498 let probe = ReadinessProbe {
499 method: HttpMethod::Post,
500 path: "/health".to_string(),
501 };
502
503 let worker = Worker::new("my-worker".to_string())
504 .code(WorkerCode::Image {
505 image: "test-image".to_string(),
506 })
507 .permissions("execution".to_string())
508 .public_endpoint(WorkerPublicEndpoint {
509 name: "api".to_string(),
510 host_label: None,
511 wildcard_subdomains: false,
512 })
513 .readiness_probe(probe.clone())
514 .build();
515
516 assert_eq!(worker.id, "my-worker");
517 assert_eq!(worker.public_endpoints[0].name, "api");
518 assert_eq!(worker.readiness_probe, Some(probe));
519 }
520
521 #[test]
522 fn test_readiness_probe_defaults() {
523 let probe = ReadinessProbe::default();
524 assert_eq!(probe.method, HttpMethod::Get);
525 assert_eq!(probe.path, "/");
526 }
527
528 #[test]
529 fn test_worker_with_rust_toolchain() {
530 let worker = Worker::new("my-rust-worker".to_string())
531 .code(WorkerCode::Source {
532 src: "./".to_string(),
533 toolchain: ToolchainConfig::Rust {
534 binary_name: "my-app".to_string(),
535 },
536 })
537 .permissions("execution".to_string())
538 .build();
539
540 assert_eq!(worker.id, "my-rust-worker");
541
542 match &worker.code {
543 WorkerCode::Source { src, toolchain } => {
544 assert_eq!(src, "./");
545 assert_eq!(
546 toolchain,
547 &ToolchainConfig::Rust {
548 binary_name: "my-app".to_string(),
549 }
550 );
551 }
552 _ => panic!("Expected Source code"),
553 }
554 }
555
556 #[test]
557 fn test_worker_with_typescript_toolchain() {
558 let worker = Worker::new("my-ts-worker".to_string())
559 .code(WorkerCode::Source {
560 src: "./".to_string(),
561 toolchain: ToolchainConfig::TypeScript {
562 binary_name: Some("my-ts-worker".to_string()),
563 },
564 })
565 .permissions("execution".to_string())
566 .build();
567
568 assert_eq!(worker.id, "my-ts-worker");
569
570 match &worker.code {
571 WorkerCode::Source { src, toolchain } => {
572 assert_eq!(src, "./");
573 assert_eq!(
574 toolchain,
575 &ToolchainConfig::TypeScript {
576 binary_name: Some("my-ts-worker".to_string())
577 }
578 );
579 }
580 _ => panic!("Expected Source code"),
581 }
582 }
583
584 #[test]
585 fn test_worker_with_queue_trigger() {
586 use crate::Queue;
587
588 let queue = Queue::new("test-queue".to_string()).build();
589
590 let worker = Worker::new("triggered-worker".to_string())
591 .code(WorkerCode::Image {
592 image: "test-image".to_string(),
593 })
594 .permissions("execution".to_string())
595 .trigger(WorkerTrigger::queue(&queue))
596 .build();
597
598 assert_eq!(worker.triggers.len(), 1);
599 if let WorkerTrigger::Queue { queue: queue_ref } = &worker.triggers[0] {
600 assert_eq!(queue_ref.resource_type, Queue::RESOURCE_TYPE);
601 assert_eq!(queue_ref.id, "test-queue");
602 } else {
603 panic!("Expected queue trigger");
604 }
605 }
606
607 #[test]
608 fn test_worker_trigger_dependencies() {
609 use crate::Queue;
610
611 let queue = Queue::new("test-queue".to_string()).build();
612 let storage = Storage::new("test-storage".to_string()).build();
613
614 let worker = Worker::new("triggered-worker".to_string())
615 .code(WorkerCode::Image {
616 image: "test-image".to_string(),
617 })
618 .permissions("execution".to_string())
619 .link(&storage) .trigger(WorkerTrigger::queue(&queue)) .build();
622
623 let dependencies = worker.get_dependencies();
624
625 assert_eq!(dependencies.len(), 2);
627 assert!(dependencies.contains(&ResourceRef::new(Storage::RESOURCE_TYPE, "test-storage")));
628 assert!(dependencies.contains(&ResourceRef::new(Queue::RESOURCE_TYPE, "test-queue")));
629 }
630
631 #[test]
632 fn test_worker_trigger_helper_methods() {
633 use crate::Queue;
634
635 let queue = Queue::new("my-queue".to_string()).build();
636
637 let trigger = WorkerTrigger::queue(&queue);
639
640 if let WorkerTrigger::Queue { queue: queue_ref } = trigger {
641 assert_eq!(queue_ref.resource_type, Queue::RESOURCE_TYPE);
642 assert_eq!(queue_ref.id, "my-queue");
643 } else {
644 panic!("Expected queue trigger");
645 }
646 }
647
648 #[test]
649 fn test_worker_with_multiple_triggers() {
650 use crate::Queue;
651
652 let queue1 = Queue::new("queue-1".to_string()).build();
653 let queue2 = Queue::new("queue-2".to_string()).build();
654
655 let worker = Worker::new("multi-triggered-worker".to_string())
656 .code(WorkerCode::Image {
657 image: "test-image".to_string(),
658 })
659 .permissions("execution".to_string())
660 .trigger(WorkerTrigger::queue(&queue1))
661 .trigger(WorkerTrigger::queue(&queue2))
662 .trigger(WorkerTrigger::schedule("0 * * * *".to_string()))
663 .build();
664
665 assert_eq!(worker.triggers.len(), 3);
666
667 if let WorkerTrigger::Queue { queue: queue_ref } = &worker.triggers[0] {
669 assert_eq!(queue_ref.id, "queue-1");
670 } else {
671 panic!("Expected first trigger to be queue-1");
672 }
673
674 if let WorkerTrigger::Queue { queue: queue_ref } = &worker.triggers[1] {
676 assert_eq!(queue_ref.id, "queue-2");
677 } else {
678 panic!("Expected second trigger to be queue-2");
679 }
680
681 if let WorkerTrigger::Schedule { cron } = &worker.triggers[2] {
683 assert_eq!(cron, "0 * * * *");
684 } else {
685 panic!("Expected third trigger to be schedule");
686 }
687
688 let dependencies = worker.get_dependencies();
690 assert_eq!(dependencies.len(), 2); assert!(dependencies.contains(&ResourceRef::new(Queue::RESOURCE_TYPE, "queue-1")));
692 assert!(dependencies.contains(&ResourceRef::new(Queue::RESOURCE_TYPE, "queue-2")));
693 }
694
695 #[test]
696 fn test_worker_with_commands_enabled() {
697 let worker = Worker::new("cmd-worker".to_string())
698 .code(WorkerCode::Image {
699 image: "test-image".to_string(),
700 })
701 .permissions("execution".to_string())
702 .commands_enabled(true)
703 .build();
704
705 assert_eq!(worker.id, "cmd-worker");
706 assert!(worker.public_endpoints.is_empty());
707 assert_eq!(worker.commands_enabled, true);
708 }
709
710 #[test]
711 fn test_worker_defaults() {
712 let worker = Worker::new("default-worker".to_string())
713 .code(WorkerCode::Image {
714 image: "test-image".to_string(),
715 })
716 .permissions("execution".to_string())
717 .build();
718
719 assert!(worker.public_endpoints.is_empty());
721 assert_eq!(worker.commands_enabled, false);
722 assert_eq!(worker.memory_mb, 256);
723 assert_eq!(worker.timeout_seconds, 180);
724 }
725
726 #[test]
727 fn test_worker_public_ingress_with_commands() {
728 let worker = Worker::new("public-cmd-worker".to_string())
729 .code(WorkerCode::Image {
730 image: "test-image".to_string(),
731 })
732 .permissions("execution".to_string())
733 .public_endpoint(WorkerPublicEndpoint {
734 name: "api".to_string(),
735 host_label: None,
736 wildcard_subdomains: false,
737 })
738 .commands_enabled(true)
739 .build();
740
741 assert_eq!(worker.public_endpoints[0].name, "api");
742 assert_eq!(worker.commands_enabled, true);
743 }
744}