Skip to main content

peat_protocol/distribution/
directive.rs

1//! Deployment Directives - ADR-012 / ADR-026
2//!
3//! This module provides the `DeploymentDirective` type for pushing software
4//! artifacts (models, containers, binaries) from C2 to edge nodes.
5//!
6//! ## Flow
7//!
8//! ```text
9//! ┌─────────────┐                    ┌─────────────┐
10//! │     C2      │                    │  Edge Node  │
11//! │             │─DeploymentDirective│             │
12//! │             │───────────────────▶│             │
13//! │             │                    │ fetch blob  │
14//! │             │                    │ activate    │
15//! │             │◀───────────────────│             │
16//! │             │  DeploymentStatus  │ advertise   │
17//! └─────────────┘                    └─────────────┘
18//! ```
19//!
20//! ## Usage
21//!
22//! ```rust
23//! use peat_protocol::distribution::{
24//!     DeploymentDirective, DeploymentScope, ArtifactSpec, DeploymentPriority,
25//! };
26//!
27//! // Create directive for ONNX model deployment
28//! let directive = DeploymentDirective::new("yolov8n-deploy-001")
29//!     .with_artifact(ArtifactSpec::onnx_model(
30//!         "sha256:abc123...",
31//!         500_000_000,
32//!         vec!["CUDAExecutionProvider".into()],
33//!     ))
34//!     .with_scope(DeploymentScope::formation("formation-alpha"))
35//!     .with_capabilities(vec!["object_detection".into()])
36//!     .with_priority(DeploymentPriority::High);
37//! ```
38
39use chrono::{DateTime, Utc};
40use serde::{Deserialize, Serialize};
41use std::collections::HashMap;
42
43/// Deployment directive - command to deploy software to nodes
44///
45/// This is the primary message type for C2 → Edge software deployment.
46/// Nodes matching the scope will fetch the artifact and activate it.
47#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct DeploymentDirective {
49    /// Unique directive identifier
50    pub directive_id: String,
51    /// When the directive was issued
52    pub issued_at: DateTime<Utc>,
53    /// Node ID of the issuer
54    pub issuer_node_id: String,
55    /// Formation ID of the issuer (for hierarchy routing)
56    #[serde(skip_serializing_if = "Option::is_none")]
57    pub issuer_formation_id: Option<String>,
58    /// Target scope for this deployment
59    pub scope: DeploymentScope,
60    /// Artifact specification
61    pub artifact: ArtifactSpec,
62    /// Capabilities this deployment provides
63    #[serde(default)]
64    pub capabilities: Vec<String>,
65    /// Runtime-specific configuration
66    #[serde(default)]
67    pub config: serde_json::Value,
68    /// Deployment options
69    #[serde(default)]
70    pub options: DeploymentOptions,
71}
72
73impl DeploymentDirective {
74    /// Create a new deployment directive
75    pub fn new(directive_id: impl Into<String>) -> Self {
76        Self {
77            directive_id: directive_id.into(),
78            issued_at: Utc::now(),
79            issuer_node_id: String::new(),
80            issuer_formation_id: None,
81            scope: DeploymentScope::Broadcast,
82            artifact: ArtifactSpec::default(),
83            capabilities: Vec::new(),
84            config: serde_json::Value::Null,
85            options: DeploymentOptions::default(),
86        }
87    }
88
89    /// Generate a unique directive ID
90    pub fn generate() -> Self {
91        Self::new(uuid::Uuid::new_v4().to_string())
92    }
93
94    /// Set the issuer node
95    pub fn with_issuer(mut self, node_id: impl Into<String>) -> Self {
96        self.issuer_node_id = node_id.into();
97        self
98    }
99
100    /// Set the issuer formation
101    pub fn with_formation(mut self, formation_id: impl Into<String>) -> Self {
102        self.issuer_formation_id = Some(formation_id.into());
103        self
104    }
105
106    /// Set the deployment scope
107    pub fn with_scope(mut self, scope: DeploymentScope) -> Self {
108        self.scope = scope;
109        self
110    }
111
112    /// Set the artifact
113    pub fn with_artifact(mut self, artifact: ArtifactSpec) -> Self {
114        self.artifact = artifact;
115        self
116    }
117
118    /// Add capabilities
119    pub fn with_capabilities(mut self, capabilities: Vec<String>) -> Self {
120        self.capabilities = capabilities;
121        self
122    }
123
124    /// Add a capability
125    pub fn with_capability(mut self, capability: impl Into<String>) -> Self {
126        self.capabilities.push(capability.into());
127        self
128    }
129
130    /// Set runtime config
131    pub fn with_config(mut self, config: serde_json::Value) -> Self {
132        self.config = config;
133        self
134    }
135
136    /// Set deployment options
137    pub fn with_options(mut self, options: DeploymentOptions) -> Self {
138        self.options = options;
139        self
140    }
141
142    /// Set priority
143    pub fn with_priority(mut self, priority: DeploymentPriority) -> Self {
144        self.options.priority = priority;
145        self
146    }
147
148    /// Check if this directive targets a specific node
149    pub fn targets_node(&self, node_id: &str) -> bool {
150        match &self.scope {
151            DeploymentScope::Broadcast => true,
152            DeploymentScope::Formation(fid) => {
153                // Would need formation membership lookup
154                // For now, return true if same formation
155                self.issuer_formation_id.as_deref() == Some(fid)
156            }
157            DeploymentScope::Nodes(node_ids) => node_ids.iter().any(|n| n == node_id),
158            DeploymentScope::Capability(filter) => {
159                // Would need capability matching
160                // For now, assume match if no specific requirements
161                filter.required_capabilities.is_empty()
162            }
163        }
164    }
165}
166
167/// Scope for deployment targeting
168#[derive(Debug, Clone, Serialize, Deserialize)]
169#[serde(tag = "type", rename_all = "snake_case")]
170pub enum DeploymentScope {
171    /// Broadcast to all capable nodes
172    Broadcast,
173    /// Target a specific formation
174    Formation(String),
175    /// Target specific nodes by ID
176    Nodes(Vec<String>),
177    /// Target nodes matching capability filter
178    Capability(CapabilityFilter),
179}
180
181impl DeploymentScope {
182    /// Create scope for a specific formation
183    pub fn formation(formation_id: impl Into<String>) -> Self {
184        Self::Formation(formation_id.into())
185    }
186
187    /// Create scope for specific nodes
188    pub fn nodes(node_ids: Vec<String>) -> Self {
189        Self::Nodes(node_ids)
190    }
191
192    /// Create scope for capability-based targeting
193    pub fn with_capabilities(capabilities: Vec<String>) -> Self {
194        Self::Capability(CapabilityFilter {
195            required_capabilities: capabilities,
196            ..Default::default()
197        })
198    }
199}
200
201/// Filter for capability-based deployment targeting
202#[derive(Debug, Clone, Default, Serialize, Deserialize)]
203pub struct CapabilityFilter {
204    /// Minimum GPU memory in MB
205    #[serde(skip_serializing_if = "Option::is_none")]
206    pub min_gpu_memory_mb: Option<u64>,
207    /// Minimum system memory in MB
208    #[serde(skip_serializing_if = "Option::is_none")]
209    pub min_memory_mb: Option<u64>,
210    /// Minimum storage in MB
211    #[serde(skip_serializing_if = "Option::is_none")]
212    pub min_storage_mb: Option<u64>,
213    /// Required capabilities (e.g., ["cuda", "tensorrt"])
214    #[serde(default)]
215    pub required_capabilities: Vec<String>,
216    /// Custom filters
217    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
218    pub custom: HashMap<String, String>,
219}
220
221/// Artifact specification for deployment
222#[derive(Debug, Clone, Default, Serialize, Deserialize)]
223pub struct ArtifactSpec {
224    /// Blob hash (content-addressed)
225    pub blob_hash: String,
226    /// Size in bytes
227    pub size_bytes: u64,
228    /// Artifact type
229    pub artifact_type: ArtifactType,
230    /// SHA256 hash for verification (if different from blob hash)
231    #[serde(skip_serializing_if = "Option::is_none")]
232    pub sha256: Option<String>,
233    /// Human-readable name
234    #[serde(skip_serializing_if = "Option::is_none")]
235    pub name: Option<String>,
236    /// Version string
237    #[serde(skip_serializing_if = "Option::is_none")]
238    pub version: Option<String>,
239}
240
241impl ArtifactSpec {
242    /// Create ONNX model artifact spec
243    pub fn onnx_model(
244        blob_hash: impl Into<String>,
245        size_bytes: u64,
246        execution_providers: Vec<String>,
247    ) -> Self {
248        Self {
249            blob_hash: blob_hash.into(),
250            size_bytes,
251            artifact_type: ArtifactType::OnnxModel {
252                execution_providers,
253            },
254            sha256: None,
255            name: None,
256            version: None,
257        }
258    }
259
260    /// Create container artifact spec
261    pub fn container(
262        blob_hash: impl Into<String>,
263        size_bytes: u64,
264        runtime: ContainerRuntime,
265    ) -> Self {
266        Self {
267            blob_hash: blob_hash.into(),
268            size_bytes,
269            artifact_type: ArtifactType::Container {
270                runtime,
271                ports: Vec::new(),
272                env: HashMap::new(),
273            },
274            sha256: None,
275            name: None,
276            version: None,
277        }
278    }
279
280    /// Create native binary artifact spec
281    pub fn native_binary(
282        blob_hash: impl Into<String>,
283        size_bytes: u64,
284        arch: impl Into<String>,
285    ) -> Self {
286        Self {
287            blob_hash: blob_hash.into(),
288            size_bytes,
289            artifact_type: ArtifactType::NativeBinary {
290                arch: arch.into(),
291                args: Vec::new(),
292            },
293            sha256: None,
294            name: None,
295            version: None,
296        }
297    }
298
299    /// Set name
300    pub fn with_name(mut self, name: impl Into<String>) -> Self {
301        self.name = Some(name.into());
302        self
303    }
304
305    /// Set version
306    pub fn with_version(mut self, version: impl Into<String>) -> Self {
307        self.version = Some(version.into());
308        self
309    }
310
311    /// Set SHA256 hash
312    pub fn with_sha256(mut self, sha256: impl Into<String>) -> Self {
313        self.sha256 = Some(sha256.into());
314        self
315    }
316}
317
318/// Artifact type (mirrors peat-inference ArtifactType for protocol layer)
319#[derive(Debug, Clone, Serialize, Deserialize)]
320#[serde(tag = "type", rename_all = "snake_case")]
321pub enum ArtifactType {
322    /// ONNX model for inference
323    OnnxModel {
324        /// Execution providers in preference order
325        #[serde(default)]
326        execution_providers: Vec<String>,
327    },
328    /// Container image
329    Container {
330        /// Container runtime
331        runtime: ContainerRuntime,
332        /// Port mappings
333        #[serde(default)]
334        ports: Vec<PortMapping>,
335        /// Environment variables
336        #[serde(default)]
337        env: HashMap<String, String>,
338    },
339    /// Native executable
340    NativeBinary {
341        /// Target architecture
342        arch: String,
343        /// Command-line arguments
344        #[serde(default)]
345        args: Vec<String>,
346    },
347    /// Configuration package
348    ConfigPackage {
349        /// Target extraction path
350        target_path: String,
351    },
352    /// WebAssembly module
353    WasmModule {
354        /// WASI capabilities
355        #[serde(default)]
356        wasi_capabilities: Vec<String>,
357    },
358}
359
360impl Default for ArtifactType {
361    fn default() -> Self {
362        Self::OnnxModel {
363            execution_providers: Vec::new(),
364        }
365    }
366}
367
368/// Container runtime
369#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq)]
370#[serde(rename_all = "lowercase")]
371pub enum ContainerRuntime {
372    #[default]
373    Docker,
374    Podman,
375    Containerd,
376}
377
378/// Port mapping for containers
379#[derive(Debug, Clone, Serialize, Deserialize)]
380pub struct PortMapping {
381    /// Container port
382    pub container_port: u16,
383    /// Host port
384    pub host_port: u16,
385    /// Protocol (tcp/udp)
386    #[serde(default = "default_protocol")]
387    pub protocol: String,
388}
389
390fn default_protocol() -> String {
391    "tcp".to_string()
392}
393
394/// Deployment options
395#[derive(Debug, Clone, Serialize, Deserialize)]
396pub struct DeploymentOptions {
397    /// Priority level
398    #[serde(default)]
399    pub priority: DeploymentPriority,
400    /// Timeout in seconds (0 = no timeout)
401    #[serde(default = "default_timeout")]
402    pub timeout_seconds: u32,
403    /// Replace existing deployment with same capabilities
404    #[serde(default)]
405    pub replace_existing: bool,
406    /// Rollback threshold (percentage of nodes that must succeed)
407    #[serde(skip_serializing_if = "Option::is_none")]
408    pub rollback_threshold_percent: Option<u32>,
409    /// Auto-activate after download
410    #[serde(default = "default_true")]
411    pub auto_activate: bool,
412}
413
414fn default_timeout() -> u32 {
415    300 // 5 minutes
416}
417
418fn default_true() -> bool {
419    true
420}
421
422impl Default for DeploymentOptions {
423    fn default() -> Self {
424        Self {
425            priority: DeploymentPriority::Normal,
426            timeout_seconds: default_timeout(),
427            replace_existing: false,
428            rollback_threshold_percent: None,
429            auto_activate: true,
430        }
431    }
432}
433
434/// Deployment priority
435#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq)]
436#[serde(rename_all = "lowercase")]
437pub enum DeploymentPriority {
438    /// Critical - interrupt other operations
439    Critical,
440    /// High - process soon
441    High,
442    /// Normal - standard processing
443    #[default]
444    Normal,
445    /// Low - process when idle
446    Low,
447}
448
449/// Deployment status report from a node
450#[derive(Debug, Clone, Serialize, Deserialize)]
451pub struct DeploymentStatus {
452    /// Directive ID this status is for
453    pub directive_id: String,
454    /// Reporting node ID
455    pub node_id: String,
456    /// When this status was reported
457    pub reported_at: DateTime<Utc>,
458    /// Current state
459    pub state: DeploymentState,
460    /// Progress percentage (0-100)
461    pub progress_percent: u8,
462    /// Error message (if state is Failed)
463    #[serde(skip_serializing_if = "Option::is_none")]
464    pub error_message: Option<String>,
465    /// Instance ID (if state is Active)
466    #[serde(skip_serializing_if = "Option::is_none")]
467    pub instance_id: Option<String>,
468}
469
470impl DeploymentStatus {
471    /// Create a new status report
472    pub fn new(directive_id: impl Into<String>, node_id: impl Into<String>) -> Self {
473        Self {
474            directive_id: directive_id.into(),
475            node_id: node_id.into(),
476            reported_at: Utc::now(),
477            state: DeploymentState::Pending,
478            progress_percent: 0,
479            error_message: None,
480            instance_id: None,
481        }
482    }
483
484    /// Set state to downloading
485    pub fn downloading(mut self, progress: u8) -> Self {
486        self.state = DeploymentState::Downloading;
487        self.progress_percent = progress.min(99);
488        self
489    }
490
491    /// Set state to activating
492    pub fn activating(mut self) -> Self {
493        self.state = DeploymentState::Activating;
494        self.progress_percent = 100;
495        self
496    }
497
498    /// Set state to active
499    pub fn active(mut self, instance_id: impl Into<String>) -> Self {
500        self.state = DeploymentState::Active;
501        self.progress_percent = 100;
502        self.instance_id = Some(instance_id.into());
503        self
504    }
505
506    /// Set state to failed
507    pub fn failed(mut self, error: impl Into<String>) -> Self {
508        self.state = DeploymentState::Failed;
509        self.error_message = Some(error.into());
510        self
511    }
512
513    /// Set state to rolled back
514    pub fn rolled_back(mut self) -> Self {
515        self.state = DeploymentState::RolledBack;
516        self
517    }
518}
519
520/// Deployment state
521#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
522#[serde(rename_all = "snake_case")]
523pub enum DeploymentState {
524    /// Directive received, waiting to process
525    Pending,
526    /// Downloading artifact from blob store
527    Downloading,
528    /// Activating artifact via runtime adapter
529    Activating,
530    /// Artifact is active and running
531    Active,
532    /// Deployment failed
533    Failed,
534    /// Deployment was rolled back
535    RolledBack,
536}
537
538#[cfg(test)]
539mod tests {
540    use super::*;
541
542    #[test]
543    fn test_directive_creation() {
544        let directive = DeploymentDirective::generate()
545            .with_issuer("c2-node-1")
546            .with_formation("formation-alpha")
547            .with_artifact(ArtifactSpec::onnx_model(
548                "sha256:abc123",
549                500_000_000,
550                vec!["CUDAExecutionProvider".into()],
551            ))
552            .with_capability("object_detection")
553            .with_priority(DeploymentPriority::High);
554
555        assert!(!directive.directive_id.is_empty());
556        assert_eq!(directive.issuer_node_id, "c2-node-1");
557        assert_eq!(directive.capabilities, vec!["object_detection"]);
558        assert_eq!(directive.options.priority, DeploymentPriority::High);
559    }
560
561    #[test]
562    fn test_scope_targeting() {
563        // Broadcast targets everyone
564        let directive = DeploymentDirective::generate();
565        assert!(directive.targets_node("any-node"));
566
567        // Node list targets specific nodes
568        let directive = DeploymentDirective::generate().with_scope(DeploymentScope::nodes(vec![
569            "node-1".into(),
570            "node-2".into(),
571        ]));
572        assert!(directive.targets_node("node-1"));
573        assert!(!directive.targets_node("node-3"));
574    }
575
576    #[test]
577    fn test_artifact_spec() {
578        let spec = ArtifactSpec::onnx_model("sha256:abc", 1000, vec!["CUDA".into()])
579            .with_name("YOLOv8n")
580            .with_version("1.0.0");
581
582        assert_eq!(spec.blob_hash, "sha256:abc");
583        assert_eq!(spec.name, Some("YOLOv8n".to_string()));
584        assert!(matches!(spec.artifact_type, ArtifactType::OnnxModel { .. }));
585    }
586
587    #[test]
588    fn test_deployment_status_transitions() {
589        let status = DeploymentStatus::new("directive-1", "node-1");
590        assert_eq!(status.state, DeploymentState::Pending);
591
592        let status = status.downloading(50);
593        assert_eq!(status.state, DeploymentState::Downloading);
594        assert_eq!(status.progress_percent, 50);
595
596        let status = status.activating();
597        assert_eq!(status.state, DeploymentState::Activating);
598
599        let status = status.active("instance-123");
600        assert_eq!(status.state, DeploymentState::Active);
601        assert_eq!(status.instance_id, Some("instance-123".to_string()));
602    }
603
604    #[test]
605    fn test_serialization() {
606        let directive = DeploymentDirective::generate().with_artifact(ArtifactSpec::container(
607            "sha256:def456",
608            100_000_000,
609            ContainerRuntime::Docker,
610        ));
611
612        let json = serde_json::to_string_pretty(&directive).unwrap();
613        let parsed: DeploymentDirective = serde_json::from_str(&json).unwrap();
614
615        assert_eq!(parsed.directive_id, directive.directive_id);
616        assert!(matches!(
617            parsed.artifact.artifact_type,
618            ArtifactType::Container { .. }
619        ));
620    }
621}