stormchaser-runner-k8s 1.3.2

A robust, distributed workflow engine for event-driven and human-triggered workflows.
Documentation
//! State machine for Kubernetes jobs.
//!
//! This module defines the state machine, metadata, metrics, and specifications
//! for running and managing jobs on a Kubernetes cluster.

use serde_json::Value;
use std::collections::HashMap;
use stormchaser_model::dsl::Step;

/// Cryptographic utilities for job state.
pub mod crypto;
/// Kubernetes utility functions for job management.
pub mod k8s_utils;
/// State transitions for the job machine.
pub mod transitions;

use chrono::{DateTime, Utc};
use k8s_openapi::api::core::v1::ResourceRequirements as K8sResources;
use kube::Client;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use uuid::Uuid;

use stormchaser_model::dsl;

/// Metadata associated with a Kubernetes job execution.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobMetadata {
    /// Unique identifier for the workflow run.
    pub run_id: Uuid,
    /// Unique identifier for the specific step.
    pub step_id: Uuid,
    /// DSL definition of the step to execute.
    pub step_dsl: Step,
    /// Kubernetes namespace where the job will run.
    pub namespace: String,
    /// Timestamp when the job was received by the runner.
    pub received_at: DateTime<Utc>,
    /// The version of the Kubernetes cluster.
    pub cluster_version: String,
    /// Optional encryption key for secure state storage.
    pub encryption_key: Option<String>,
    /// Optional storage configuration and URLs.
    pub storage: Option<HashMap<String, Value>>,
    /// Optional URLs for uploading test reports.
    pub test_report_urls: Option<HashMap<String, Value>>,
}

/// Metrics collected after a job execution completes.
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct JobMetrics {
    /// Exit code of the primary job container, if available.
    pub exit_code: Option<i32>,
    /// Number of attempts made to run the job.
    pub attempts: i32,
    /// Total duration of the job execution in milliseconds.
    pub duration_ms: u64,
    /// Latency before the job started executing in milliseconds.
    pub latency_ms: u64,
    /// Hashes of storage volumes used by the job.
    pub storage_hashes: Option<HashMap<String, String>>,
    /// Artifacts produced by the job.
    pub artifacts: Option<HashMap<String, Value>>,
    /// Test reports generated by the job.
    pub test_reports: Option<HashMap<String, Value>>,
}

/// The final outcome state of a job.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum JobState {
    /// The job completed successfully.
    Succeeded(JobMetrics),
    /// The job failed with a reason and metrics.
    Failed(String, JobMetrics),
}

/// Result of attempting to start a job.
pub enum StartResult {
    /// The job was successfully started and is now running.
    Running(K8sJobMachine<state::Running>),
    /// The job failed to start.
    Failed(K8sJobMachine<state::Finished>),
}

/// States for the job machine.
pub mod state {
    /// Initialized state before the job has started.
    #[derive(Clone)]
    pub struct Initialized;

    /// Running state while the job is executing.
    pub struct Running {
        /// Name of the Kubernetes job.
        pub job_name: String,
        /// Time when the job was dispatched to the cluster.
        pub dispatched_at: chrono::DateTime<chrono::Utc>,
    }

    /// Finished state after the job has completed.
    pub struct Finished {
        /// Final state and metrics of the completed job.
        pub result: super::JobState,
    }
}

/// State machine for managing a Kubernetes job's lifecycle.
#[derive(Clone)]
pub struct K8sJobMachine<S> {
    /// Kubernetes API client.
    pub client: Client,
    /// Metadata for the job.
    pub metadata: JobMetadata,
    /// Current state of the machine.
    pub state: S,
}

/// Specification for a Kubernetes job derived from DSL.
#[derive(Debug, Deserialize)]
pub struct K8sJobSpec {
    /// Container image to run.
    pub image: String,
    /// Command to execute in the container.
    pub command: Option<Vec<String>>,
    /// Arguments to pass to the command.
    pub args: Option<Vec<String>>,
    /// Environment variables for the container.
    pub env: Option<Vec<dsl::EnvVar>>,
    /// Resource requirements (CPU/Memory).
    pub resources: Option<K8sResources>,
    /// Maximum duration the job can be active.
    pub active_deadline_seconds: Option<i64>,
    /// Number of retries before marking job as failed.
    pub backoff_limit: Option<i32>,
    /// Number of successfully finished pods required.
    pub completions: Option<i32>,
    /// Maximum number of pods to run in parallel.
    pub parallelism: Option<i32>,
    /// Time to keep the job after it finishes.
    pub ttl_seconds_after_finished: Option<i32>,
    /// Whether to run the container in privileged mode.
    pub privileged: Option<bool>,
    /// Node selector labels for scheduling.
    pub node_selector: Option<BTreeMap<String, String>>,
    /// Service account to use for the job.
    pub service_account_name: Option<String>,
    /// Restart policy for the pod.
    pub restart_policy: Option<String>,
    /// Additional labels for the job.
    pub labels: Option<BTreeMap<String, String>>,
    /// Additional annotations for the job.
    pub annotations: Option<BTreeMap<String, String>>,
    /// Storage volumes to mount.
    pub storage_mounts: Option<Vec<dsl::StorageMount>>,
    /// Secrets to mount as volumes.
    pub secret_mounts: Option<Vec<dsl::SecretMount>>,
    /// ConfigMaps to mount as volumes.
    pub config_map_mounts: Option<Vec<dsl::ConfigMapMount>>,
    /// Minimum required Kubernetes cluster version.
    #[allow(dead_code)]
    pub minimum_version: Option<String>,
}

impl K8sJobMachine<state::Initialized> {
    /// Creates a new job machine in the Initialized state.
    pub fn new(client: Client, metadata: JobMetadata) -> Self {
        Self {
            client,
            metadata,
            state: state::Initialized,
        }
    }
}