use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeploymentDirective {
pub directive_id: String,
pub issued_at: DateTime<Utc>,
pub issuer_node_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub issuer_formation_id: Option<String>,
pub scope: DeploymentScope,
pub artifact: ArtifactSpec,
#[serde(default)]
pub capabilities: Vec<String>,
#[serde(default)]
pub config: serde_json::Value,
#[serde(default)]
pub options: DeploymentOptions,
}
impl DeploymentDirective {
pub fn new(directive_id: impl Into<String>) -> Self {
Self {
directive_id: directive_id.into(),
issued_at: Utc::now(),
issuer_node_id: String::new(),
issuer_formation_id: None,
scope: DeploymentScope::Broadcast,
artifact: ArtifactSpec::default(),
capabilities: Vec::new(),
config: serde_json::Value::Null,
options: DeploymentOptions::default(),
}
}
pub fn generate() -> Self {
Self::new(uuid::Uuid::new_v4().to_string())
}
pub fn with_issuer(mut self, node_id: impl Into<String>) -> Self {
self.issuer_node_id = node_id.into();
self
}
pub fn with_formation(mut self, formation_id: impl Into<String>) -> Self {
self.issuer_formation_id = Some(formation_id.into());
self
}
pub fn with_scope(mut self, scope: DeploymentScope) -> Self {
self.scope = scope;
self
}
pub fn with_artifact(mut self, artifact: ArtifactSpec) -> Self {
self.artifact = artifact;
self
}
pub fn with_capabilities(mut self, capabilities: Vec<String>) -> Self {
self.capabilities = capabilities;
self
}
pub fn with_capability(mut self, capability: impl Into<String>) -> Self {
self.capabilities.push(capability.into());
self
}
pub fn with_config(mut self, config: serde_json::Value) -> Self {
self.config = config;
self
}
pub fn with_options(mut self, options: DeploymentOptions) -> Self {
self.options = options;
self
}
pub fn with_priority(mut self, priority: DeploymentPriority) -> Self {
self.options.priority = priority;
self
}
pub fn targets_node(&self, node_id: &str) -> bool {
match &self.scope {
DeploymentScope::Broadcast => true,
DeploymentScope::Formation(fid) => {
self.issuer_formation_id.as_deref() == Some(fid)
}
DeploymentScope::Nodes(node_ids) => node_ids.iter().any(|n| n == node_id),
DeploymentScope::Capability(filter) => {
filter.required_capabilities.is_empty()
}
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum DeploymentScope {
Broadcast,
Formation(String),
Nodes(Vec<String>),
Capability(CapabilityFilter),
}
impl DeploymentScope {
pub fn formation(formation_id: impl Into<String>) -> Self {
Self::Formation(formation_id.into())
}
pub fn nodes(node_ids: Vec<String>) -> Self {
Self::Nodes(node_ids)
}
pub fn with_capabilities(capabilities: Vec<String>) -> Self {
Self::Capability(CapabilityFilter {
required_capabilities: capabilities,
..Default::default()
})
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct CapabilityFilter {
#[serde(skip_serializing_if = "Option::is_none")]
pub min_gpu_memory_mb: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub min_memory_mb: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub min_storage_mb: Option<u64>,
#[serde(default)]
pub required_capabilities: Vec<String>,
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub custom: HashMap<String, String>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ArtifactSpec {
pub blob_hash: String,
pub size_bytes: u64,
pub artifact_type: ArtifactType,
#[serde(skip_serializing_if = "Option::is_none")]
pub sha256: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub version: Option<String>,
}
impl ArtifactSpec {
pub fn onnx_model(
blob_hash: impl Into<String>,
size_bytes: u64,
execution_providers: Vec<String>,
) -> Self {
Self {
blob_hash: blob_hash.into(),
size_bytes,
artifact_type: ArtifactType::OnnxModel {
execution_providers,
},
sha256: None,
name: None,
version: None,
}
}
pub fn container(
blob_hash: impl Into<String>,
size_bytes: u64,
runtime: ContainerRuntime,
) -> Self {
Self {
blob_hash: blob_hash.into(),
size_bytes,
artifact_type: ArtifactType::Container {
runtime,
ports: Vec::new(),
env: HashMap::new(),
},
sha256: None,
name: None,
version: None,
}
}
pub fn native_binary(
blob_hash: impl Into<String>,
size_bytes: u64,
arch: impl Into<String>,
) -> Self {
Self {
blob_hash: blob_hash.into(),
size_bytes,
artifact_type: ArtifactType::NativeBinary {
arch: arch.into(),
args: Vec::new(),
},
sha256: None,
name: None,
version: None,
}
}
pub fn with_name(mut self, name: impl Into<String>) -> Self {
self.name = Some(name.into());
self
}
pub fn with_version(mut self, version: impl Into<String>) -> Self {
self.version = Some(version.into());
self
}
pub fn with_sha256(mut self, sha256: impl Into<String>) -> Self {
self.sha256 = Some(sha256.into());
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum ArtifactType {
OnnxModel {
#[serde(default)]
execution_providers: Vec<String>,
},
Container {
runtime: ContainerRuntime,
#[serde(default)]
ports: Vec<PortMapping>,
#[serde(default)]
env: HashMap<String, String>,
},
NativeBinary {
arch: String,
#[serde(default)]
args: Vec<String>,
},
ConfigPackage {
target_path: String,
},
WasmModule {
#[serde(default)]
wasi_capabilities: Vec<String>,
},
}
impl Default for ArtifactType {
fn default() -> Self {
Self::OnnxModel {
execution_providers: Vec::new(),
}
}
}
#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum ContainerRuntime {
#[default]
Docker,
Podman,
Containerd,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PortMapping {
pub container_port: u16,
pub host_port: u16,
#[serde(default = "default_protocol")]
pub protocol: String,
}
fn default_protocol() -> String {
"tcp".to_string()
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeploymentOptions {
#[serde(default)]
pub priority: DeploymentPriority,
#[serde(default = "default_timeout")]
pub timeout_seconds: u32,
#[serde(default)]
pub replace_existing: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub rollback_threshold_percent: Option<u32>,
#[serde(default = "default_true")]
pub auto_activate: bool,
}
fn default_timeout() -> u32 {
300 }
fn default_true() -> bool {
true
}
impl Default for DeploymentOptions {
fn default() -> Self {
Self {
priority: DeploymentPriority::Normal,
timeout_seconds: default_timeout(),
replace_existing: false,
rollback_threshold_percent: None,
auto_activate: true,
}
}
}
#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum DeploymentPriority {
Critical,
High,
#[default]
Normal,
Low,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeploymentStatus {
pub directive_id: String,
pub node_id: String,
pub reported_at: DateTime<Utc>,
pub state: DeploymentState,
pub progress_percent: u8,
#[serde(skip_serializing_if = "Option::is_none")]
pub error_message: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub instance_id: Option<String>,
}
impl DeploymentStatus {
pub fn new(directive_id: impl Into<String>, node_id: impl Into<String>) -> Self {
Self {
directive_id: directive_id.into(),
node_id: node_id.into(),
reported_at: Utc::now(),
state: DeploymentState::Pending,
progress_percent: 0,
error_message: None,
instance_id: None,
}
}
pub fn downloading(mut self, progress: u8) -> Self {
self.state = DeploymentState::Downloading;
self.progress_percent = progress.min(99);
self
}
pub fn activating(mut self) -> Self {
self.state = DeploymentState::Activating;
self.progress_percent = 100;
self
}
pub fn active(mut self, instance_id: impl Into<String>) -> Self {
self.state = DeploymentState::Active;
self.progress_percent = 100;
self.instance_id = Some(instance_id.into());
self
}
pub fn failed(mut self, error: impl Into<String>) -> Self {
self.state = DeploymentState::Failed;
self.error_message = Some(error.into());
self
}
pub fn rolled_back(mut self) -> Self {
self.state = DeploymentState::RolledBack;
self
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum DeploymentState {
Pending,
Downloading,
Activating,
Active,
Failed,
RolledBack,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_directive_creation() {
let directive = DeploymentDirective::generate()
.with_issuer("c2-node-1")
.with_formation("formation-alpha")
.with_artifact(ArtifactSpec::onnx_model(
"sha256:abc123",
500_000_000,
vec!["CUDAExecutionProvider".into()],
))
.with_capability("object_detection")
.with_priority(DeploymentPriority::High);
assert!(!directive.directive_id.is_empty());
assert_eq!(directive.issuer_node_id, "c2-node-1");
assert_eq!(directive.capabilities, vec!["object_detection"]);
assert_eq!(directive.options.priority, DeploymentPriority::High);
}
#[test]
fn test_scope_targeting() {
let directive = DeploymentDirective::generate();
assert!(directive.targets_node("any-node"));
let directive = DeploymentDirective::generate().with_scope(DeploymentScope::nodes(vec![
"node-1".into(),
"node-2".into(),
]));
assert!(directive.targets_node("node-1"));
assert!(!directive.targets_node("node-3"));
}
#[test]
fn test_artifact_spec() {
let spec = ArtifactSpec::onnx_model("sha256:abc", 1000, vec!["CUDA".into()])
.with_name("YOLOv8n")
.with_version("1.0.0");
assert_eq!(spec.blob_hash, "sha256:abc");
assert_eq!(spec.name, Some("YOLOv8n".to_string()));
assert!(matches!(spec.artifact_type, ArtifactType::OnnxModel { .. }));
}
#[test]
fn test_deployment_status_transitions() {
let status = DeploymentStatus::new("directive-1", "node-1");
assert_eq!(status.state, DeploymentState::Pending);
let status = status.downloading(50);
assert_eq!(status.state, DeploymentState::Downloading);
assert_eq!(status.progress_percent, 50);
let status = status.activating();
assert_eq!(status.state, DeploymentState::Activating);
let status = status.active("instance-123");
assert_eq!(status.state, DeploymentState::Active);
assert_eq!(status.instance_id, Some("instance-123".to_string()));
}
#[test]
fn test_serialization() {
let directive = DeploymentDirective::generate().with_artifact(ArtifactSpec::container(
"sha256:def456",
100_000_000,
ContainerRuntime::Docker,
));
let json = serde_json::to_string_pretty(&directive).unwrap();
let parsed: DeploymentDirective = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.directive_id, directive.directive_id);
assert!(matches!(
parsed.artifact.artifact_type,
ArtifactType::Container { .. }
));
}
}