use crate::error::{ErrorData, Result};
use crate::resource::{ResourceDefinition, ResourceOutputsDefinition, ResourceRef, ResourceType};
use crate::{PublicEndpointOutput, WorkerPublicEndpoint};
use alien_error::AlienError;
use bon::Builder;
use serde::{Deserialize, Serialize};
use std::any::Any;
use std::collections::HashMap;
use std::fmt::Debug;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
#[serde(rename_all = "camelCase", tag = "type")]
pub enum WorkerCode {
#[serde(rename_all = "camelCase")]
Image {
image: String,
},
#[serde(rename_all = "camelCase")]
Source {
src: String,
toolchain: ToolchainConfig,
},
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
#[serde(rename_all = "lowercase", tag = "type")]
pub enum ToolchainConfig {
#[serde(rename_all = "camelCase")]
Rust {
binary_name: String,
},
#[serde(rename_all = "camelCase")]
TypeScript {
#[serde(default, skip_serializing_if = "Option::is_none")]
binary_name: Option<String>,
},
#[serde(rename_all = "camelCase")]
Docker {
#[serde(skip_serializing_if = "Option::is_none")]
dockerfile: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
build_args: Option<HashMap<String, String>>,
#[serde(skip_serializing_if = "Option::is_none")]
target: Option<String>,
},
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
#[serde(tag = "type", rename_all = "camelCase")]
pub enum WorkerTrigger {
Queue {
queue: ResourceRef,
},
Storage {
storage: ResourceRef,
events: Vec<String>,
},
Schedule {
cron: String,
},
}
impl WorkerTrigger {
pub fn queue<R: ?Sized>(queue: &R) -> Self
where
for<'a> &'a R: Into<ResourceRef>,
{
let queue_ref: ResourceRef = queue.into();
WorkerTrigger::Queue { queue: queue_ref }
}
pub fn storage<R: ?Sized>(storage: &R, events: Vec<String>) -> Self
where
for<'a> &'a R: Into<ResourceRef>,
{
let storage_ref: ResourceRef = storage.into();
WorkerTrigger::Storage {
storage: storage_ref,
events,
}
}
pub fn schedule<S: Into<String>>(cron: S) -> Self {
WorkerTrigger::Schedule { cron: cron.into() }
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Builder)]
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
#[serde(rename_all = "camelCase", deny_unknown_fields)]
#[builder(start_fn = new)]
pub struct Worker {
#[builder(start_fn)]
pub id: String,
#[builder(field)]
pub links: Vec<ResourceRef>,
#[builder(field)]
pub triggers: Vec<WorkerTrigger>,
#[builder(field)]
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub public_endpoints: Vec<WorkerPublicEndpoint>,
pub permissions: String,
pub code: WorkerCode,
#[builder(default = default_memory_mb())]
#[serde(default = "default_memory_mb")]
#[cfg_attr(feature = "openapi", schema(default = default_memory_mb))]
pub memory_mb: u32,
#[builder(default = default_timeout_seconds())]
#[serde(default = "default_timeout_seconds")]
#[cfg_attr(feature = "openapi", schema(default = default_timeout_seconds))]
pub timeout_seconds: u32,
#[builder(default)]
#[serde(default)]
pub environment: HashMap<String, String>,
#[builder(default = default_commands_enabled())]
#[serde(default = "default_commands_enabled")]
#[cfg_attr(feature = "openapi", schema(default = default_commands_enabled))]
pub commands_enabled: bool,
pub concurrency_limit: Option<u32>,
pub readiness_probe: Option<ReadinessProbe>,
}
impl Worker {
pub const RESOURCE_TYPE: ResourceType = ResourceType::from_static("worker");
pub fn get_permissions(&self) -> &str {
&self.permissions
}
fn validate_public_endpoints(&self) -> Result<()> {
let mut endpoint_names = std::collections::HashSet::new();
for endpoint in &self.public_endpoints {
endpoint.validate_for_resource(&self.id)?;
if !endpoint_names.insert(endpoint.name.as_str()) {
return Err(AlienError::new(ErrorData::InvalidResourceUpdate {
resource_id: self.id.clone(),
reason: format!("duplicate public endpoint name '{}'", endpoint.name),
}));
}
}
Ok(())
}
}
fn default_memory_mb() -> u32 {
256
}
fn default_timeout_seconds() -> u32 {
180
}
fn default_commands_enabled() -> bool {
false
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
#[serde(rename_all = "UPPERCASE")]
#[derive(Default)]
pub enum HttpMethod {
#[default]
Get,
Post,
Put,
Delete,
Head,
Options,
Patch,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
#[serde(rename_all = "camelCase")]
pub struct ReadinessProbe {
#[serde(default)]
pub method: HttpMethod,
#[serde(default = "default_probe_path")]
pub path: String,
}
fn default_probe_path() -> String {
"/".to_string()
}
impl Default for ReadinessProbe {
fn default() -> Self {
Self {
method: HttpMethod::default(),
path: default_probe_path(),
}
}
}
use crate::resources::worker::worker_builder::State;
impl<S: State> WorkerBuilder<S> {
pub fn link<R: ?Sized>(mut self, resource: &R) -> Self
where
for<'a> &'a R: Into<ResourceRef>, {
let resource_ref: ResourceRef = resource.into();
self.links.push(resource_ref);
self
}
pub fn trigger(mut self, trigger: WorkerTrigger) -> Self {
self.triggers.push(trigger);
self
}
pub fn public_endpoint(mut self, endpoint: WorkerPublicEndpoint) -> Self {
self.public_endpoints.push(endpoint);
self
}
}
impl ResourceDefinition for Worker {
fn get_resource_type(&self) -> ResourceType {
Self::RESOURCE_TYPE
}
fn id(&self) -> &str {
&self.id
}
fn get_dependencies(&self) -> Vec<ResourceRef> {
let mut dependencies = self.links.clone();
for trigger in &self.triggers {
match trigger {
WorkerTrigger::Queue { queue } => {
dependencies.push(queue.clone());
}
WorkerTrigger::Storage { storage, .. } => {
dependencies.push(storage.clone());
}
WorkerTrigger::Schedule { .. } => {
}
}
}
dependencies
}
fn get_permissions(&self) -> Option<&str> {
Some(&self.permissions)
}
fn validate_update(&self, new_config: &dyn ResourceDefinition) -> Result<()> {
let new_worker = new_config
.as_any()
.downcast_ref::<Worker>()
.ok_or_else(|| {
AlienError::new(ErrorData::UnexpectedResourceType {
resource_id: self.id.clone(),
expected: Self::RESOURCE_TYPE,
actual: new_config.get_resource_type(),
})
})?;
if self.id != new_worker.id {
return Err(AlienError::new(ErrorData::InvalidResourceUpdate {
resource_id: self.id.clone(),
reason: "the 'id' field is immutable".to_string(),
}));
}
self.validate_public_endpoints()?;
new_worker.validate_public_endpoints()?;
if self.public_endpoints != new_worker.public_endpoints {
return Err(AlienError::new(ErrorData::InvalidResourceUpdate {
resource_id: self.id.clone(),
reason: "the 'publicEndpoints' field is immutable".to_string(),
}));
}
Ok(())
}
fn as_any(&self) -> &dyn Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
fn box_clone(&self) -> Box<dyn ResourceDefinition> {
Box::new(self.clone())
}
fn resource_eq(&self, other: &dyn ResourceDefinition) -> bool {
other.as_any().downcast_ref::<Worker>() == Some(self)
}
fn to_json_value(&self) -> serde_json::Result<serde_json::Value> {
serde_json::to_value(self)
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
#[serde(rename_all = "camelCase")]
pub struct WorkerOutputs {
pub worker_name: String,
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub public_endpoints: HashMap<String, PublicEndpointOutput>,
#[serde(skip_serializing_if = "Option::is_none")]
pub identifier: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub commands_push_target: Option<String>,
}
impl ResourceOutputsDefinition for WorkerOutputs {
fn get_resource_type(&self) -> ResourceType {
Worker::RESOURCE_TYPE.clone()
}
fn as_any(&self) -> &dyn Any {
self
}
fn box_clone(&self) -> Box<dyn ResourceOutputsDefinition> {
Box::new(self.clone())
}
fn outputs_eq(&self, other: &dyn ResourceOutputsDefinition) -> bool {
other.as_any().downcast_ref::<WorkerOutputs>() == Some(self)
}
fn to_json_value(&self) -> serde_json::Result<serde_json::Value> {
serde_json::to_value(self)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::Storage;
#[test]
fn test_worker_builder_direct_refs() {
let dummy_storage = Storage::new("test-storage".to_string()).build();
let dummy_storage_2 = Storage::new("test-storage-2".to_string()).build();
let worker = Worker::new("my-worker".to_string())
.code(WorkerCode::Image {
image: "test-image".to_string(),
})
.permissions("execution".to_string())
.link(&dummy_storage) .link(&dummy_storage_2) .build();
assert_eq!(worker.id, "my-worker");
assert_eq!(
worker.code,
WorkerCode::Image {
image: "test-image".to_string()
}
);
assert_eq!(worker.permissions, "execution");
assert!(worker
.links
.contains(&ResourceRef::new(Storage::RESOURCE_TYPE, "test-storage")));
assert!(worker
.links
.contains(&ResourceRef::new(Storage::RESOURCE_TYPE, "test-storage-2")));
assert_eq!(worker.links.len(), 2); }
#[test]
fn test_worker_with_readiness_probe() {
let probe = ReadinessProbe {
method: HttpMethod::Post,
path: "/health".to_string(),
};
let worker = Worker::new("my-worker".to_string())
.code(WorkerCode::Image {
image: "test-image".to_string(),
})
.permissions("execution".to_string())
.public_endpoint(WorkerPublicEndpoint {
name: "api".to_string(),
host_label: None,
wildcard_subdomains: false,
})
.readiness_probe(probe.clone())
.build();
assert_eq!(worker.id, "my-worker");
assert_eq!(worker.public_endpoints[0].name, "api");
assert_eq!(worker.readiness_probe, Some(probe));
}
#[test]
fn test_readiness_probe_defaults() {
let probe = ReadinessProbe::default();
assert_eq!(probe.method, HttpMethod::Get);
assert_eq!(probe.path, "/");
}
#[test]
fn test_worker_with_rust_toolchain() {
let worker = Worker::new("my-rust-worker".to_string())
.code(WorkerCode::Source {
src: "./".to_string(),
toolchain: ToolchainConfig::Rust {
binary_name: "my-app".to_string(),
},
})
.permissions("execution".to_string())
.build();
assert_eq!(worker.id, "my-rust-worker");
match &worker.code {
WorkerCode::Source { src, toolchain } => {
assert_eq!(src, "./");
assert_eq!(
toolchain,
&ToolchainConfig::Rust {
binary_name: "my-app".to_string(),
}
);
}
_ => panic!("Expected Source code"),
}
}
#[test]
fn test_worker_with_typescript_toolchain() {
let worker = Worker::new("my-ts-worker".to_string())
.code(WorkerCode::Source {
src: "./".to_string(),
toolchain: ToolchainConfig::TypeScript {
binary_name: Some("my-ts-worker".to_string()),
},
})
.permissions("execution".to_string())
.build();
assert_eq!(worker.id, "my-ts-worker");
match &worker.code {
WorkerCode::Source { src, toolchain } => {
assert_eq!(src, "./");
assert_eq!(
toolchain,
&ToolchainConfig::TypeScript {
binary_name: Some("my-ts-worker".to_string())
}
);
}
_ => panic!("Expected Source code"),
}
}
#[test]
fn test_worker_with_queue_trigger() {
use crate::Queue;
let queue = Queue::new("test-queue".to_string()).build();
let worker = Worker::new("triggered-worker".to_string())
.code(WorkerCode::Image {
image: "test-image".to_string(),
})
.permissions("execution".to_string())
.trigger(WorkerTrigger::queue(&queue))
.build();
assert_eq!(worker.triggers.len(), 1);
if let WorkerTrigger::Queue { queue: queue_ref } = &worker.triggers[0] {
assert_eq!(queue_ref.resource_type, Queue::RESOURCE_TYPE);
assert_eq!(queue_ref.id, "test-queue");
} else {
panic!("Expected queue trigger");
}
}
#[test]
fn test_worker_trigger_dependencies() {
use crate::Queue;
let queue = Queue::new("test-queue".to_string()).build();
let storage = Storage::new("test-storage".to_string()).build();
let worker = Worker::new("triggered-worker".to_string())
.code(WorkerCode::Image {
image: "test-image".to_string(),
})
.permissions("execution".to_string())
.link(&storage) .trigger(WorkerTrigger::queue(&queue)) .build();
let dependencies = worker.get_dependencies();
assert_eq!(dependencies.len(), 2);
assert!(dependencies.contains(&ResourceRef::new(Storage::RESOURCE_TYPE, "test-storage")));
assert!(dependencies.contains(&ResourceRef::new(Queue::RESOURCE_TYPE, "test-queue")));
}
#[test]
fn test_worker_trigger_helper_methods() {
use crate::Queue;
let queue = Queue::new("my-queue".to_string()).build();
let trigger = WorkerTrigger::queue(&queue);
if let WorkerTrigger::Queue { queue: queue_ref } = trigger {
assert_eq!(queue_ref.resource_type, Queue::RESOURCE_TYPE);
assert_eq!(queue_ref.id, "my-queue");
} else {
panic!("Expected queue trigger");
}
}
#[test]
fn test_worker_with_multiple_triggers() {
use crate::Queue;
let queue1 = Queue::new("queue-1".to_string()).build();
let queue2 = Queue::new("queue-2".to_string()).build();
let worker = Worker::new("multi-triggered-worker".to_string())
.code(WorkerCode::Image {
image: "test-image".to_string(),
})
.permissions("execution".to_string())
.trigger(WorkerTrigger::queue(&queue1))
.trigger(WorkerTrigger::queue(&queue2))
.trigger(WorkerTrigger::schedule("0 * * * *".to_string()))
.build();
assert_eq!(worker.triggers.len(), 3);
if let WorkerTrigger::Queue { queue: queue_ref } = &worker.triggers[0] {
assert_eq!(queue_ref.id, "queue-1");
} else {
panic!("Expected first trigger to be queue-1");
}
if let WorkerTrigger::Queue { queue: queue_ref } = &worker.triggers[1] {
assert_eq!(queue_ref.id, "queue-2");
} else {
panic!("Expected second trigger to be queue-2");
}
if let WorkerTrigger::Schedule { cron } = &worker.triggers[2] {
assert_eq!(cron, "0 * * * *");
} else {
panic!("Expected third trigger to be schedule");
}
let dependencies = worker.get_dependencies();
assert_eq!(dependencies.len(), 2); assert!(dependencies.contains(&ResourceRef::new(Queue::RESOURCE_TYPE, "queue-1")));
assert!(dependencies.contains(&ResourceRef::new(Queue::RESOURCE_TYPE, "queue-2")));
}
#[test]
fn test_worker_with_commands_enabled() {
let worker = Worker::new("cmd-worker".to_string())
.code(WorkerCode::Image {
image: "test-image".to_string(),
})
.permissions("execution".to_string())
.commands_enabled(true)
.build();
assert_eq!(worker.id, "cmd-worker");
assert!(worker.public_endpoints.is_empty());
assert_eq!(worker.commands_enabled, true);
}
#[test]
fn test_worker_defaults() {
let worker = Worker::new("default-worker".to_string())
.code(WorkerCode::Image {
image: "test-image".to_string(),
})
.permissions("execution".to_string())
.build();
assert!(worker.public_endpoints.is_empty());
assert_eq!(worker.commands_enabled, false);
assert_eq!(worker.memory_mb, 256);
assert_eq!(worker.timeout_seconds, 180);
}
#[test]
fn test_worker_public_ingress_with_commands() {
let worker = Worker::new("public-cmd-worker".to_string())
.code(WorkerCode::Image {
image: "test-image".to_string(),
})
.permissions("execution".to_string())
.public_endpoint(WorkerPublicEndpoint {
name: "api".to_string(),
host_label: None,
wildcard_subdomains: false,
})
.commands_enabled(true)
.build();
assert_eq!(worker.public_endpoints[0].name, "api");
assert_eq!(worker.commands_enabled, true);
}
}