Skip to main content

alien_core/resources/
container_cluster.rs

1//! ContainerCluster resource for long-running container workloads.
2//!
3//! A ContainerCluster represents the compute infrastructure needed to run containers.
4//! It provisions:
5//! - Auto Scaling Groups (AWS), Managed Instance Groups (GCP), or VM Scale Sets (Azure)
6//! - IAM roles/service accounts for machine authentication
7//! - Security groups/firewall rules
8//! - Launch templates/instance configurations
9//!
10//! The cluster integrates with Horizon for container scheduling and orchestration.
11
12use crate::error::{ErrorData, Result};
13use crate::resource::{ResourceDefinition, ResourceOutputsDefinition, ResourceRef};
14use crate::ResourceType;
15use alien_error::AlienError;
16use bon::Builder;
17use serde::{Deserialize, Serialize};
18use std::any::Any;
19use std::fmt::Debug;
20
21/// GPU specification for a capacity group.
22#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
23#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
24#[serde(rename_all = "camelCase")]
25pub struct GpuSpec {
26    /// GPU type identifier (e.g., "nvidia-a100", "nvidia-t4")
27    #[serde(rename = "type")]
28    pub gpu_type: String,
29    /// Number of GPUs per machine
30    pub count: u32,
31}
32
33/// Machine resource profile for a capacity group.
34///
35/// Represents the hardware specifications for machines in a capacity group.
36/// These are hardware totals (what the instance type advertises), not allocatable
37/// capacity. Horizon's scheduler internally subtracts system reserves for planning.
38#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
39#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
40#[serde(rename_all = "camelCase")]
41pub struct MachineProfile {
42    /// CPU cores per machine (hardware total) - stored as string to preserve precision
43    /// (e.g., "8.0", "4.5")
44    pub cpu: String,
45    /// Memory in bytes (hardware total)
46    pub memory_bytes: u64,
47    /// Ephemeral storage in bytes (hardware total)
48    pub ephemeral_storage_bytes: u64,
49    /// GPU specification (optional)
50    #[serde(skip_serializing_if = "Option::is_none")]
51    pub gpu: Option<GpuSpec>,
52}
53
54/// Capacity group definition.
55///
56/// A capacity group represents machines with identical hardware profiles.
57/// Each group becomes a separate Auto Scaling Group (AWS), Managed Instance Group (GCP),
58/// or VM Scale Set (Azure).
59#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
60#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
61#[serde(rename_all = "camelCase")]
62pub struct CapacityGroup {
63    /// Unique identifier for this capacity group (must be lowercase alphanumeric with hyphens)
64    pub group_id: String,
65    /// Instance type for machines in this group (e.g., "m7g.xlarge", "n2-standard-8")
66    /// Auto-selected if not specified, based on profile requirements.
67    #[serde(skip_serializing_if = "Option::is_none")]
68    pub instance_type: Option<String>,
69    /// Machine resource profile (auto-derived from instance_type if not specified)
70    #[serde(skip_serializing_if = "Option::is_none")]
71    pub profile: Option<MachineProfile>,
72    /// Minimum number of machines (can be 0 for scale-to-zero)
73    pub min_size: u32,
74    /// Maximum number of machines (must be ≤ 10)
75    pub max_size: u32,
76}
77
78/// Deployment-time values that affect VM instance templates.
79///
80/// These are stamped onto `ContainerCluster` by `stamp_template_inputs()` before
81/// each deployment. Storing them in the resource config means `resource_eq()`
82/// detects changes (e.g., new horizond binary URL, rotated monitoring credentials)
83/// and triggers the normal update flow without any executor changes.
84///
85/// The OTLP auth header is sensitive — only a SHA-256 hash is stored here for
86/// change detection. The actual header value is read from `DeploymentConfig` at
87/// provisioning time.
88#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
89#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
90#[serde(rename_all = "camelCase")]
91pub struct TemplateInputs {
92    /// Base URL for downloading the horizond binary (without arch suffix).
93    pub horizond_download_base_url: String,
94    /// Horizon API base URL (e.g., "https://horizon.alien.dev").
95    pub horizon_api_url: String,
96    /// ETag of the horizond binary from the releases server — change-detection signal.
97    /// Changes on every `cargo zigbuild` (nginx ETag = mtime+size), triggering a rolling update.
98    /// Absent when the releases server is unreachable; change detection falls back to URL-only.
99    #[serde(skip_serializing_if = "Option::is_none")]
100    pub horizond_binary_hash: Option<String>,
101    /// OTLP logs endpoint URL (non-sensitive, stored directly).
102    #[serde(skip_serializing_if = "Option::is_none")]
103    pub monitoring_logs_endpoint: Option<String>,
104    /// OTLP metrics endpoint URL (non-sensitive, stored directly).
105    /// When set, horizond will export its own VM/container orchestration metrics here.
106    /// The same auth header as logs is reused at boot time (stored in the cloud vault).
107    #[serde(skip_serializing_if = "Option::is_none")]
108    pub monitoring_metrics_endpoint: Option<String>,
109    /// SHA-256 hash of the OTLP logs auth header — for change detection only.
110    /// The actual header is read from DeploymentConfig at provisioning time.
111    #[serde(skip_serializing_if = "Option::is_none")]
112    pub monitoring_auth_hash: Option<String>,
113    /// SHA-256 hash of the OTLP metrics auth header — for change detection only.
114    /// Only set when metrics uses a separate auth header from logs (e.g. Axiom with distinct datasets).
115    #[serde(skip_serializing_if = "Option::is_none")]
116    pub monitoring_metrics_auth_hash: Option<String>,
117}
118
119/// ContainerCluster resource for running long-running container workloads.
120///
121/// A ContainerCluster provides compute infrastructure that integrates with Horizon
122/// for container orchestration. It manages auto scaling groups of machines that
123/// run the horizond agent for container scheduling.
124///
125/// ## Architecture
126///
127/// - **Alien** provisions infrastructure: ASGs/MIGs/VMSSs, IAM roles, security groups
128/// - **Horizon** manages containers: scheduling replicas to machines, autoscaling
129/// - **horizond** runs on each machine: starts/stops containers based on Horizon's assignments
130///
131/// ## Example
132///
133/// ```rust
134/// use alien_core::{ContainerCluster, CapacityGroup};
135///
136/// let cluster = ContainerCluster::new("compute".to_string())
137///     .capacity_group(CapacityGroup {
138///         group_id: "general".to_string(),
139///         instance_type: Some("m7g.xlarge".to_string()),
140///         profile: None,
141///         min_size: 1,
142///         max_size: 5,
143///     })
144///     .build();
145/// ```
146#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Builder)]
147#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
148#[serde(rename_all = "camelCase", deny_unknown_fields)]
149#[builder(start_fn = new)]
150pub struct ContainerCluster {
151    /// Unique identifier for the container cluster.
152    /// Must contain only alphanumeric characters, hyphens, and underscores.
153    #[builder(start_fn)]
154    pub id: String,
155
156    /// Capacity groups defining the machine pools for this cluster.
157    /// Each group becomes a separate ASG/MIG/VMSS.
158    #[builder(field)]
159    pub capacity_groups: Vec<CapacityGroup>,
160
161    /// Container CIDR block for internal container networking.
162    /// Auto-generated as "10.244.0.0/16" if not specified.
163    /// Each machine gets a /24 subnet from this range.
164    #[serde(skip_serializing_if = "Option::is_none")]
165    pub container_cidr: Option<String>,
166
167    /// Deployment-time values that affect instance templates (horizond URL, monitoring, etc.).
168    /// Populated by stamp_template_inputs() from DeploymentConfig — not user-provided.
169    #[builder(skip)]
170    #[serde(skip_serializing_if = "Option::is_none")]
171    pub template_inputs: Option<TemplateInputs>,
172}
173
174impl ContainerCluster {
175    /// The resource type identifier for ContainerCluster
176    pub const RESOURCE_TYPE: ResourceType = ResourceType::from_static("container-cluster");
177
178    /// Returns the cluster's unique identifier.
179    pub fn id(&self) -> &str {
180        &self.id
181    }
182
183    /// Returns the container CIDR, defaulting to "10.244.0.0/16" if not specified.
184    pub fn container_cidr(&self) -> &str {
185        self.container_cidr.as_deref().unwrap_or("10.244.0.0/16")
186    }
187}
188
189impl<S: container_cluster_builder::State> ContainerClusterBuilder<S> {
190    /// Adds a capacity group to the cluster.
191    pub fn capacity_group(mut self, group: CapacityGroup) -> Self {
192        self.capacity_groups.push(group);
193        self
194    }
195}
196
197/// Status of a single capacity group within a ContainerCluster.
198#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
199#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
200#[serde(rename_all = "camelCase")]
201pub struct CapacityGroupStatus {
202    /// Capacity group ID
203    pub group_id: String,
204    /// Current number of machines
205    pub current_machines: u32,
206    /// Desired number of machines (from Horizon's capacity plan)
207    pub desired_machines: u32,
208    /// Instance type being used
209    pub instance_type: String,
210}
211
212/// Outputs generated by a successfully provisioned ContainerCluster.
213#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
214#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
215#[serde(rename_all = "camelCase")]
216pub struct ContainerClusterOutputs {
217    /// Horizon cluster ID (workspace/project/agent/resourceid format)
218    pub cluster_id: String,
219    /// Whether the Horizon cluster is ready
220    pub horizon_ready: bool,
221    /// Status of each capacity group
222    pub capacity_group_statuses: Vec<CapacityGroupStatus>,
223    /// Total number of machines across all capacity groups
224    pub total_machines: u32,
225}
226
227#[typetag::serde(name = "container-cluster")]
228impl ResourceOutputsDefinition for ContainerClusterOutputs {
229    fn resource_type() -> ResourceType {
230        ContainerCluster::RESOURCE_TYPE.clone()
231    }
232
233    fn as_any(&self) -> &dyn Any {
234        self
235    }
236
237    fn box_clone(&self) -> Box<dyn ResourceOutputsDefinition> {
238        Box::new(self.clone())
239    }
240
241    fn outputs_eq(&self, other: &dyn ResourceOutputsDefinition) -> bool {
242        other.as_any().downcast_ref::<ContainerClusterOutputs>() == Some(self)
243    }
244}
245
246#[typetag::serde(name = "container-cluster")]
247impl ResourceDefinition for ContainerCluster {
248    fn resource_type() -> ResourceType {
249        Self::RESOURCE_TYPE.clone()
250    }
251
252    fn get_resource_type(&self) -> ResourceType {
253        Self::resource_type()
254    }
255
256    fn id(&self) -> &str {
257        &self.id
258    }
259
260    fn get_dependencies(&self) -> Vec<ResourceRef> {
261        // ContainerCluster has no static dependencies.
262        // Network dependency is platform-specific:
263        // - AWS/GCP/Azure: Added by ContainerClusterMutation
264        // - Local/Kubernetes: Not needed (Docker/K8s handles networking)
265        // Platform controllers use require_dependency() at runtime to access Network state.
266        Vec::new()
267    }
268
269    fn validate_update(&self, new_config: &dyn ResourceDefinition) -> Result<()> {
270        let new_cluster = new_config
271            .as_any()
272            .downcast_ref::<ContainerCluster>()
273            .ok_or_else(|| {
274                AlienError::new(ErrorData::UnexpectedResourceType {
275                    resource_id: self.id.clone(),
276                    expected: Self::RESOURCE_TYPE,
277                    actual: new_config.get_resource_type(),
278                })
279            })?;
280
281        if self.id != new_cluster.id {
282            return Err(AlienError::new(ErrorData::InvalidResourceUpdate {
283                resource_id: self.id.clone(),
284                reason: "the 'id' field is immutable".to_string(),
285            }));
286        }
287
288        // Container CIDR is immutable once set
289        if self.container_cidr.is_some()
290            && new_cluster.container_cidr.is_some()
291            && self.container_cidr != new_cluster.container_cidr
292        {
293            return Err(AlienError::new(ErrorData::InvalidResourceUpdate {
294                resource_id: self.id.clone(),
295                reason: "the 'containerCidr' field is immutable once set".to_string(),
296            }));
297        }
298
299        // Validate capacity groups
300        for new_group in &new_cluster.capacity_groups {
301            if let Some(existing_group) = self
302                .capacity_groups
303                .iter()
304                .find(|g| g.group_id == new_group.group_id)
305            {
306                // Instance type is immutable for existing groups
307                if existing_group.instance_type.is_some()
308                    && new_group.instance_type.is_some()
309                    && existing_group.instance_type != new_group.instance_type
310                {
311                    return Err(AlienError::new(ErrorData::InvalidResourceUpdate {
312                        resource_id: self.id.clone(),
313                        reason: format!(
314                            "instance type for capacity group '{}' is immutable",
315                            new_group.group_id
316                        ),
317                    }));
318                }
319            }
320        }
321
322        Ok(())
323    }
324
325    fn as_any(&self) -> &dyn Any {
326        self
327    }
328
329    fn as_any_mut(&mut self) -> &mut dyn Any {
330        self
331    }
332
333    fn box_clone(&self) -> Box<dyn ResourceDefinition> {
334        Box::new(self.clone())
335    }
336
337    fn resource_eq(&self, other: &dyn ResourceDefinition) -> bool {
338        other.as_any().downcast_ref::<ContainerCluster>() == Some(self)
339    }
340}
341
342#[cfg(test)]
343mod tests {
344    use super::*;
345
346    #[test]
347    fn test_container_cluster_creation() {
348        let cluster = ContainerCluster::new("compute".to_string())
349            .capacity_group(CapacityGroup {
350                group_id: "general".to_string(),
351                instance_type: Some("m7g.xlarge".to_string()),
352                profile: None,
353                min_size: 1,
354                max_size: 5,
355            })
356            .build();
357
358        assert_eq!(cluster.id(), "compute");
359        assert_eq!(cluster.capacity_groups.len(), 1);
360        assert_eq!(cluster.capacity_groups[0].group_id, "general");
361        assert_eq!(cluster.container_cidr(), "10.244.0.0/16");
362    }
363
364    #[test]
365    fn test_container_cluster_multiple_capacity_groups() {
366        let cluster = ContainerCluster::new("multi-pool".to_string())
367            .capacity_group(CapacityGroup {
368                group_id: "general".to_string(),
369                instance_type: Some("m7g.xlarge".to_string()),
370                profile: None,
371                min_size: 1,
372                max_size: 3,
373            })
374            .capacity_group(CapacityGroup {
375                group_id: "gpu".to_string(),
376                instance_type: Some("g5.xlarge".to_string()),
377                profile: Some(MachineProfile {
378                    cpu: "4.0".to_string(),
379                    memory_bytes: 17179869184,             // 16 GiB
380                    ephemeral_storage_bytes: 214748364800, // 200 GiB
381                    gpu: Some(GpuSpec {
382                        gpu_type: "nvidia-a10g".to_string(),
383                        count: 1,
384                    }),
385                }),
386                min_size: 0,
387                max_size: 2,
388            })
389            .build();
390
391        assert_eq!(cluster.capacity_groups.len(), 2);
392        assert_eq!(cluster.capacity_groups[0].group_id, "general");
393        assert_eq!(cluster.capacity_groups[1].group_id, "gpu");
394        assert!(cluster.capacity_groups[1]
395            .profile
396            .as_ref()
397            .unwrap()
398            .gpu
399            .is_some());
400    }
401
402    #[test]
403    fn test_container_cluster_custom_cidr() {
404        let cluster = ContainerCluster::new("custom-net".to_string())
405            .container_cidr("172.30.0.0/16".to_string())
406            .capacity_group(CapacityGroup {
407                group_id: "general".to_string(),
408                instance_type: None,
409                profile: None,
410                min_size: 1,
411                max_size: 5,
412            })
413            .build();
414
415        assert_eq!(cluster.container_cidr(), "172.30.0.0/16");
416    }
417
418    #[test]
419    fn test_container_cluster_validate_update_immutable_id() {
420        let cluster1 = ContainerCluster::new("cluster-1".to_string())
421            .capacity_group(CapacityGroup {
422                group_id: "general".to_string(),
423                instance_type: None,
424                profile: None,
425                min_size: 1,
426                max_size: 5,
427            })
428            .build();
429
430        let cluster2 = ContainerCluster::new("cluster-2".to_string())
431            .capacity_group(CapacityGroup {
432                group_id: "general".to_string(),
433                instance_type: None,
434                profile: None,
435                min_size: 1,
436                max_size: 5,
437            })
438            .build();
439
440        let result = cluster1.validate_update(&cluster2);
441        assert!(result.is_err());
442    }
443
444    #[test]
445    fn test_container_cluster_validate_update_scale_change() {
446        let cluster1 = ContainerCluster::new("compute".to_string())
447            .capacity_group(CapacityGroup {
448                group_id: "general".to_string(),
449                instance_type: Some("m7g.xlarge".to_string()),
450                profile: None,
451                min_size: 1,
452                max_size: 5,
453            })
454            .build();
455
456        let cluster2 = ContainerCluster::new("compute".to_string())
457            .capacity_group(CapacityGroup {
458                group_id: "general".to_string(),
459                instance_type: Some("m7g.xlarge".to_string()),
460                profile: None,
461                min_size: 2,
462                max_size: 10,
463            })
464            .build();
465
466        // Scale changes should be allowed
467        let result = cluster1.validate_update(&cluster2);
468        assert!(result.is_ok());
469    }
470
471    #[test]
472    fn test_container_cluster_serialization() {
473        let cluster = ContainerCluster::new("test-cluster".to_string())
474            .capacity_group(CapacityGroup {
475                group_id: "general".to_string(),
476                instance_type: Some("m7g.xlarge".to_string()),
477                profile: None,
478                min_size: 1,
479                max_size: 5,
480            })
481            .build();
482
483        let json = serde_json::to_string(&cluster).unwrap();
484        let deserialized: ContainerCluster = serde_json::from_str(&json).unwrap();
485        assert_eq!(cluster, deserialized);
486    }
487}