Skip to main content

alien_core/resources/
container_cluster.rs

1//! ContainerCluster resource for long-running container workloads.
2//!
3//! A ContainerCluster represents the compute infrastructure needed to run containers.
4//! It provisions:
5//! - Auto Scaling Groups (AWS), Managed Instance Groups (GCP), or VM Scale Sets (Azure)
6//! - IAM roles/service accounts for machine authentication
7//! - Security groups/firewall rules
8//! - Launch templates/instance configurations
9//!
10//! The cluster integrates with Horizon for container scheduling and orchestration.
11
12use crate::error::{ErrorData, Result};
13use crate::resource::{ResourceDefinition, ResourceOutputsDefinition, ResourceRef};
14use crate::ResourceType;
15use alien_error::AlienError;
16use bon::Builder;
17use serde::{Deserialize, Serialize};
18use std::any::Any;
19use std::fmt::Debug;
20
21/// GPU specification for a capacity group.
22#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
23#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
24#[serde(rename_all = "camelCase")]
25pub struct GpuSpec {
26    /// GPU type identifier (e.g., "nvidia-a100", "nvidia-t4")
27    #[serde(rename = "type")]
28    pub gpu_type: String,
29    /// Number of GPUs per machine
30    pub count: u32,
31}
32
33/// Machine resource profile for a capacity group.
34///
35/// Represents the hardware specifications for machines in a capacity group.
36/// These are hardware totals (what the instance type advertises), not allocatable
37/// capacity. Horizon's scheduler internally subtracts system reserves for planning.
38#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
39#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
40#[serde(rename_all = "camelCase")]
41pub struct MachineProfile {
42    /// CPU cores per machine (hardware total) - stored as string to preserve precision
43    /// (e.g., "8.0", "4.5")
44    pub cpu: String,
45    /// Memory in bytes (hardware total)
46    pub memory_bytes: u64,
47    /// Ephemeral storage in bytes (hardware total)
48    pub ephemeral_storage_bytes: u64,
49    /// GPU specification (optional)
50    #[serde(skip_serializing_if = "Option::is_none")]
51    pub gpu: Option<GpuSpec>,
52}
53
54/// Capacity group definition.
55///
56/// A capacity group represents machines with identical hardware profiles.
57/// Each group becomes a separate Auto Scaling Group (AWS), Managed Instance Group (GCP),
58/// or VM Scale Set (Azure).
59#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
60#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
61#[serde(rename_all = "camelCase")]
62pub struct CapacityGroup {
63    /// Unique identifier for this capacity group (must be lowercase alphanumeric with hyphens)
64    pub group_id: String,
65    /// Instance type for machines in this group (e.g., "m7g.xlarge", "n2-standard-8")
66    /// Auto-selected if not specified, based on profile requirements.
67    #[serde(skip_serializing_if = "Option::is_none")]
68    pub instance_type: Option<String>,
69    /// Machine resource profile (auto-derived from instance_type if not specified)
70    #[serde(skip_serializing_if = "Option::is_none")]
71    pub profile: Option<MachineProfile>,
72    /// Minimum number of machines (can be 0 for scale-to-zero)
73    pub min_size: u32,
74    /// Maximum number of machines (must be ≤ 10)
75    pub max_size: u32,
76}
77
78/// Deployment-time values that affect VM instance templates.
79///
80/// These are stamped onto `ContainerCluster` by `stamp_template_inputs()` before
81/// each deployment. Storing them in the resource config means `resource_eq()`
82/// detects changes (e.g., new horizond binary URL, rotated monitoring credentials)
83/// and triggers the normal update flow without any executor changes.
84///
85/// The OTLP auth header is sensitive — only a SHA-256 hash is stored here for
86/// change detection. The actual header value is read from `DeploymentConfig` at
87/// provisioning time.
88#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
89#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
90#[serde(rename_all = "camelCase")]
91pub struct TemplateInputs {
92    /// Base URL for downloading the horizond binary (without arch suffix).
93    pub horizond_download_base_url: String,
94    /// Horizon API base URL (e.g., "https://horizon.alien.dev").
95    pub horizon_api_url: String,
96    /// ETag of the horizond binary from the releases server — change-detection signal.
97    /// Changes on every `cargo zigbuild` (nginx ETag = mtime+size), triggering a rolling update.
98    /// Absent when the releases server is unreachable; change detection falls back to URL-only.
99    #[serde(skip_serializing_if = "Option::is_none")]
100    pub horizond_binary_hash: Option<String>,
101    /// OTLP logs endpoint URL (non-sensitive, stored directly).
102    #[serde(skip_serializing_if = "Option::is_none")]
103    pub monitoring_logs_endpoint: Option<String>,
104    /// OTLP metrics endpoint URL (non-sensitive, stored directly).
105    /// When set, horizond will export its own VM/container orchestration metrics here.
106    /// The same auth header as logs is reused at boot time (stored in the cloud vault).
107    #[serde(skip_serializing_if = "Option::is_none")]
108    pub monitoring_metrics_endpoint: Option<String>,
109    /// SHA-256 hash of the OTLP logs auth header — for change detection only.
110    /// The actual header is read from DeploymentConfig at provisioning time.
111    #[serde(skip_serializing_if = "Option::is_none")]
112    pub monitoring_auth_hash: Option<String>,
113    /// SHA-256 hash of the OTLP metrics auth header — for change detection only.
114    /// Only set when metrics uses a separate auth header from logs (e.g. Axiom with distinct datasets).
115    #[serde(skip_serializing_if = "Option::is_none")]
116    pub monitoring_metrics_auth_hash: Option<String>,
117}
118
119/// ContainerCluster resource for running long-running container workloads.
120///
121/// A ContainerCluster provides compute infrastructure that integrates with Horizon
122/// for container orchestration. It manages auto scaling groups of machines that
123/// run the horizond agent for container scheduling.
124///
125/// ## Architecture
126///
127/// - **Alien** provisions infrastructure: ASGs/MIGs/VMSSs, IAM roles, security groups
128/// - **Horizon** manages containers: scheduling replicas to machines, autoscaling
129/// - **horizond** runs on each machine: starts/stops containers based on Horizon's assignments
130///
131/// ## Example
132///
133/// ```rust
134/// use alien_core::{ContainerCluster, CapacityGroup};
135///
136/// let cluster = ContainerCluster::new("compute".to_string())
137///     .capacity_group(CapacityGroup {
138///         group_id: "general".to_string(),
139///         instance_type: Some("m7g.xlarge".to_string()),
140///         profile: None,
141///         min_size: 1,
142///         max_size: 5,
143///     })
144///     .build();
145/// ```
146#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Builder)]
147#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
148#[serde(rename_all = "camelCase", deny_unknown_fields)]
149#[builder(start_fn = new)]
150pub struct ContainerCluster {
151    /// Unique identifier for the container cluster.
152    /// Must contain only alphanumeric characters, hyphens, and underscores.
153    #[builder(start_fn)]
154    pub id: String,
155
156    /// Capacity groups defining the machine pools for this cluster.
157    /// Each group becomes a separate ASG/MIG/VMSS.
158    #[builder(field)]
159    pub capacity_groups: Vec<CapacityGroup>,
160
161    /// Container CIDR block for internal container networking.
162    /// Auto-generated as "10.244.0.0/16" if not specified.
163    /// Each machine gets a /24 subnet from this range.
164    #[serde(skip_serializing_if = "Option::is_none")]
165    pub container_cidr: Option<String>,
166
167    /// Deployment-time values that affect instance templates (horizond URL, monitoring, etc.).
168    /// Populated by stamp_template_inputs() from DeploymentConfig — not user-provided.
169    #[builder(skip)]
170    #[serde(skip_serializing_if = "Option::is_none")]
171    pub template_inputs: Option<TemplateInputs>,
172}
173
174impl ContainerCluster {
175    /// The resource type identifier for ContainerCluster
176    pub const RESOURCE_TYPE: ResourceType = ResourceType::from_static("container-cluster");
177
178    /// Returns the cluster's unique identifier.
179    pub fn id(&self) -> &str {
180        &self.id
181    }
182
183    /// Returns the container CIDR, defaulting to "10.244.0.0/16" if not specified.
184    pub fn container_cidr(&self) -> &str {
185        self.container_cidr.as_deref().unwrap_or("10.244.0.0/16")
186    }
187}
188
189impl<S: container_cluster_builder::State> ContainerClusterBuilder<S> {
190    /// Adds a capacity group to the cluster.
191    pub fn capacity_group(mut self, group: CapacityGroup) -> Self {
192        self.capacity_groups.push(group);
193        self
194    }
195}
196
197/// Status of a single capacity group within a ContainerCluster.
198#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
199#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
200#[serde(rename_all = "camelCase")]
201pub struct CapacityGroupStatus {
202    /// Capacity group ID
203    pub group_id: String,
204    /// Current number of machines
205    pub current_machines: u32,
206    /// Desired number of machines (from Horizon's capacity plan)
207    pub desired_machines: u32,
208    /// Instance type being used
209    pub instance_type: String,
210}
211
212/// Outputs generated by a successfully provisioned ContainerCluster.
213#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
214#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
215#[serde(rename_all = "camelCase")]
216pub struct ContainerClusterOutputs {
217    /// Horizon cluster ID (workspace/project/agent/resourceid format)
218    pub cluster_id: String,
219    /// Whether the Horizon cluster is ready
220    pub horizon_ready: bool,
221    /// Status of each capacity group
222    pub capacity_group_statuses: Vec<CapacityGroupStatus>,
223    /// Total number of machines across all capacity groups
224    pub total_machines: u32,
225}
226
227impl ResourceOutputsDefinition for ContainerClusterOutputs {
228    fn get_resource_type(&self) -> ResourceType {
229        ContainerCluster::RESOURCE_TYPE.clone()
230    }
231
232    fn as_any(&self) -> &dyn Any {
233        self
234    }
235
236    fn box_clone(&self) -> Box<dyn ResourceOutputsDefinition> {
237        Box::new(self.clone())
238    }
239
240    fn outputs_eq(&self, other: &dyn ResourceOutputsDefinition) -> bool {
241        other.as_any().downcast_ref::<ContainerClusterOutputs>() == Some(self)
242    }
243
244    fn to_json_value(&self) -> serde_json::Result<serde_json::Value> {
245        serde_json::to_value(self)
246    }
247}
248
249impl ResourceDefinition for ContainerCluster {
250    fn get_resource_type(&self) -> ResourceType {
251        Self::RESOURCE_TYPE
252    }
253
254    fn id(&self) -> &str {
255        &self.id
256    }
257
258    fn get_dependencies(&self) -> Vec<ResourceRef> {
259        // ContainerCluster has no static dependencies.
260        // Network dependency is platform-specific:
261        // - AWS/GCP/Azure: Added by ContainerClusterMutation
262        // - Local/Kubernetes: Not needed (Docker/K8s handles networking)
263        // Platform controllers use require_dependency() at runtime to access Network state.
264        Vec::new()
265    }
266
267    fn validate_update(&self, new_config: &dyn ResourceDefinition) -> Result<()> {
268        let new_cluster = new_config
269            .as_any()
270            .downcast_ref::<ContainerCluster>()
271            .ok_or_else(|| {
272                AlienError::new(ErrorData::UnexpectedResourceType {
273                    resource_id: self.id.clone(),
274                    expected: Self::RESOURCE_TYPE,
275                    actual: new_config.get_resource_type(),
276                })
277            })?;
278
279        if self.id != new_cluster.id {
280            return Err(AlienError::new(ErrorData::InvalidResourceUpdate {
281                resource_id: self.id.clone(),
282                reason: "the 'id' field is immutable".to_string(),
283            }));
284        }
285
286        // Container CIDR is immutable once set
287        if self.container_cidr.is_some()
288            && new_cluster.container_cidr.is_some()
289            && self.container_cidr != new_cluster.container_cidr
290        {
291            return Err(AlienError::new(ErrorData::InvalidResourceUpdate {
292                resource_id: self.id.clone(),
293                reason: "the 'containerCidr' field is immutable once set".to_string(),
294            }));
295        }
296
297        // Validate capacity groups
298        for new_group in &new_cluster.capacity_groups {
299            if let Some(existing_group) = self
300                .capacity_groups
301                .iter()
302                .find(|g| g.group_id == new_group.group_id)
303            {
304                // Instance type is immutable for existing groups
305                if existing_group.instance_type.is_some()
306                    && new_group.instance_type.is_some()
307                    && existing_group.instance_type != new_group.instance_type
308                {
309                    return Err(AlienError::new(ErrorData::InvalidResourceUpdate {
310                        resource_id: self.id.clone(),
311                        reason: format!(
312                            "instance type for capacity group '{}' is immutable",
313                            new_group.group_id
314                        ),
315                    }));
316                }
317            }
318        }
319
320        Ok(())
321    }
322
323    fn as_any(&self) -> &dyn Any {
324        self
325    }
326
327    fn as_any_mut(&mut self) -> &mut dyn Any {
328        self
329    }
330
331    fn box_clone(&self) -> Box<dyn ResourceDefinition> {
332        Box::new(self.clone())
333    }
334
335    fn resource_eq(&self, other: &dyn ResourceDefinition) -> bool {
336        other.as_any().downcast_ref::<ContainerCluster>() == Some(self)
337    }
338
339    fn to_json_value(&self) -> serde_json::Result<serde_json::Value> {
340        serde_json::to_value(self)
341    }
342}
343
344#[cfg(test)]
345mod tests {
346    use super::*;
347
348    #[test]
349    fn test_container_cluster_creation() {
350        let cluster = ContainerCluster::new("compute".to_string())
351            .capacity_group(CapacityGroup {
352                group_id: "general".to_string(),
353                instance_type: Some("m7g.xlarge".to_string()),
354                profile: None,
355                min_size: 1,
356                max_size: 5,
357            })
358            .build();
359
360        assert_eq!(cluster.id(), "compute");
361        assert_eq!(cluster.capacity_groups.len(), 1);
362        assert_eq!(cluster.capacity_groups[0].group_id, "general");
363        assert_eq!(cluster.container_cidr(), "10.244.0.0/16");
364    }
365
366    #[test]
367    fn test_container_cluster_multiple_capacity_groups() {
368        let cluster = ContainerCluster::new("multi-pool".to_string())
369            .capacity_group(CapacityGroup {
370                group_id: "general".to_string(),
371                instance_type: Some("m7g.xlarge".to_string()),
372                profile: None,
373                min_size: 1,
374                max_size: 3,
375            })
376            .capacity_group(CapacityGroup {
377                group_id: "gpu".to_string(),
378                instance_type: Some("g5.xlarge".to_string()),
379                profile: Some(MachineProfile {
380                    cpu: "4.0".to_string(),
381                    memory_bytes: 17179869184,             // 16 GiB
382                    ephemeral_storage_bytes: 214748364800, // 200 GiB
383                    gpu: Some(GpuSpec {
384                        gpu_type: "nvidia-a10g".to_string(),
385                        count: 1,
386                    }),
387                }),
388                min_size: 0,
389                max_size: 2,
390            })
391            .build();
392
393        assert_eq!(cluster.capacity_groups.len(), 2);
394        assert_eq!(cluster.capacity_groups[0].group_id, "general");
395        assert_eq!(cluster.capacity_groups[1].group_id, "gpu");
396        assert!(cluster.capacity_groups[1]
397            .profile
398            .as_ref()
399            .unwrap()
400            .gpu
401            .is_some());
402    }
403
404    #[test]
405    fn test_container_cluster_custom_cidr() {
406        let cluster = ContainerCluster::new("custom-net".to_string())
407            .container_cidr("172.30.0.0/16".to_string())
408            .capacity_group(CapacityGroup {
409                group_id: "general".to_string(),
410                instance_type: None,
411                profile: None,
412                min_size: 1,
413                max_size: 5,
414            })
415            .build();
416
417        assert_eq!(cluster.container_cidr(), "172.30.0.0/16");
418    }
419
420    #[test]
421    fn test_container_cluster_validate_update_immutable_id() {
422        let cluster1 = ContainerCluster::new("cluster-1".to_string())
423            .capacity_group(CapacityGroup {
424                group_id: "general".to_string(),
425                instance_type: None,
426                profile: None,
427                min_size: 1,
428                max_size: 5,
429            })
430            .build();
431
432        let cluster2 = ContainerCluster::new("cluster-2".to_string())
433            .capacity_group(CapacityGroup {
434                group_id: "general".to_string(),
435                instance_type: None,
436                profile: None,
437                min_size: 1,
438                max_size: 5,
439            })
440            .build();
441
442        let result = cluster1.validate_update(&cluster2);
443        assert!(result.is_err());
444    }
445
446    #[test]
447    fn test_container_cluster_validate_update_scale_change() {
448        let cluster1 = ContainerCluster::new("compute".to_string())
449            .capacity_group(CapacityGroup {
450                group_id: "general".to_string(),
451                instance_type: Some("m7g.xlarge".to_string()),
452                profile: None,
453                min_size: 1,
454                max_size: 5,
455            })
456            .build();
457
458        let cluster2 = ContainerCluster::new("compute".to_string())
459            .capacity_group(CapacityGroup {
460                group_id: "general".to_string(),
461                instance_type: Some("m7g.xlarge".to_string()),
462                profile: None,
463                min_size: 2,
464                max_size: 10,
465            })
466            .build();
467
468        // Scale changes should be allowed
469        let result = cluster1.validate_update(&cluster2);
470        assert!(result.is_ok());
471    }
472
473    #[test]
474    fn test_container_cluster_serialization() {
475        let cluster = ContainerCluster::new("test-cluster".to_string())
476            .capacity_group(CapacityGroup {
477                group_id: "general".to_string(),
478                instance_type: Some("m7g.xlarge".to_string()),
479                profile: None,
480                min_size: 1,
481                max_size: 5,
482            })
483            .build();
484
485        let json = serde_json::to_string(&cluster).unwrap();
486        let deserialized: ContainerCluster = serde_json::from_str(&json).unwrap();
487        assert_eq!(cluster, deserialized);
488    }
489}