Skip to main content

mvm_core/
agent.rs

1use serde::{Deserialize, Serialize};
2
3use crate::instance::InstanceState;
4use crate::node::{NodeInfo, NodeStats};
5use crate::pool::{
6    DesiredCounts, InstanceResources, RegistryArtifact, Role, RuntimePolicy, SecretScope,
7    SleepPolicyConfig, UpdateStrategy,
8};
9use crate::routing::RoutingTable;
10use crate::signing::SignedPayload;
11use crate::tenant::TenantQuota;
12
13// ============================================================================
14// Desired state schema (pushed by coordinator)
15// ============================================================================
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
18#[serde(deny_unknown_fields)]
19pub struct DesiredState {
20    pub schema_version: u32,
21    pub node_id: String,
22    pub tenants: Vec<DesiredTenant>,
23    #[serde(default)]
24    pub prune_unknown_tenants: bool,
25    #[serde(default)]
26    pub prune_unknown_pools: bool,
27    /// Monotonic sequence number from the coordinator's event log.
28    /// Agents use this to track which state version they last applied,
29    /// enabling incremental sync via `SyncEvents { since }`.
30    #[serde(default)]
31    pub sequence: u64,
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize)]
35#[serde(deny_unknown_fields)]
36pub struct DesiredTenant {
37    pub tenant_id: String,
38    pub network: DesiredTenantNetwork,
39    pub quotas: TenantQuota,
40    #[serde(default)]
41    pub secrets_hash: Option<String>,
42    pub pools: Vec<DesiredPool>,
43    /// Preferred regions for scheduling this tenant's instances.
44    /// The scheduler scores nodes in these regions higher during placement.
45    #[serde(default)]
46    pub preferred_regions: Vec<String>,
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
50#[serde(deny_unknown_fields)]
51pub struct DesiredTenantNetwork {
52    pub tenant_net_id: u16,
53    pub ipv4_subnet: String,
54}
55
56/// Maximum desired instances per pool per state.
57pub const MAX_DESIRED_PER_STATE: u32 = 100;
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
60#[serde(deny_unknown_fields)]
61pub struct DesiredPool {
62    pub pool_id: String,
63    pub flake_ref: String,
64    pub profile: String,
65    #[serde(default)]
66    pub role: Role,
67    pub instance_resources: InstanceResources,
68    pub desired_counts: DesiredCounts,
69    #[serde(default)]
70    pub runtime_policy: RuntimePolicy,
71    #[serde(default = "default_seccomp")]
72    pub seccomp_policy: String,
73    #[serde(default = "default_compression")]
74    pub snapshot_compression: String,
75    #[serde(default)]
76    pub routing_table: Option<RoutingTable>,
77    #[serde(default)]
78    pub secret_scopes: Vec<SecretScope>,
79    #[serde(default)]
80    pub sleep_policy: Option<SleepPolicyConfig>,
81    /// Default update strategy for rollouts (rolling or canary).
82    /// When set, the agent uses this instead of the deploy config default.
83    #[serde(default)]
84    pub default_update_strategy: Option<UpdateStrategy>,
85    /// Pre-built artifacts to pull from the template registry.
86    /// When set, the agent downloads artifacts from S3 instead of running
87    /// a local Nix build. Falls back to local build if the pull fails.
88    #[serde(default)]
89    pub registry_artifact: Option<RegistryArtifact>,
90}
91
92fn default_seccomp() -> String {
93    "baseline".to_string()
94}
95
96fn default_compression() -> String {
97    "none".to_string()
98}
99
100// ============================================================================
101// Deployment control types
102// ============================================================================
103
104/// Deployment phase for rollout state tracking.
105#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
106#[serde(rename_all = "snake_case")]
107pub enum DeploymentPhase {
108    NotStarted,
109    CanaryEvaluation,
110    RollingUpdate,
111    Paused,
112    Complete,
113    RolledBack,
114    Failed,
115}
116
117// ============================================================================
118// Batch operation types
119// ============================================================================
120
121/// Single item in a batch operation.
122#[derive(Debug, Clone, Serialize, Deserialize)]
123pub struct BatchActionItem {
124    pub tenant_id: String,
125    pub pool_id: String,
126    pub instance_id: String,
127    pub action: InstanceAction,
128}
129
130/// Pool-level action types.
131#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
132#[serde(rename_all = "snake_case")]
133pub enum PoolActionType {
134    StartAll,
135    StopAll,
136    WarmAll,
137    DestroyAll {
138        wipe_volumes: bool,
139    },
140    ScaleTo {
141        running: u32,
142        warm: u32,
143        sleeping: u32,
144    },
145}
146
147/// Result for a single item in a batch operation.
148#[derive(Debug, Clone, Serialize, Deserialize)]
149pub struct BatchActionItemResult {
150    pub tenant_id: String,
151    pub pool_id: String,
152    pub instance_id: String,
153    pub success: bool,
154    #[serde(default, skip_serializing_if = "Option::is_none")]
155    pub new_status: Option<String>,
156    #[serde(default, skip_serializing_if = "Option::is_none")]
157    pub error: Option<String>,
158}
159
160// ============================================================================
161// Monitoring and observability types
162// ============================================================================
163
164/// Health status for a single instance.
165#[derive(Debug, Clone, Serialize, Deserialize)]
166pub struct InstanceHealthReport {
167    pub tenant_id: String,
168    pub pool_id: String,
169    pub instance_id: String,
170    pub status: InstanceState,
171    pub healthy: bool,
172    pub integration_health: Vec<IntegrationHealthSummary>,
173    pub probe_results: Vec<ProbeResultSummary>,
174    pub idle_metrics: crate::idle_metrics::IdleMetrics,
175    #[serde(default, skip_serializing_if = "Option::is_none")]
176    pub last_health_check_at: Option<String>,
177}
178
179/// Integration health summary (from guest integrations).
180#[derive(Debug, Clone, Serialize, Deserialize)]
181pub struct IntegrationHealthSummary {
182    pub name: String,
183    pub healthy: bool,
184    #[serde(default, skip_serializing_if = "Option::is_none")]
185    pub detail: Option<String>,
186}
187
188/// Probe result summary (from guest probes).
189#[derive(Debug, Clone, Serialize, Deserialize)]
190pub struct ProbeResultSummary {
191    pub name: String,
192    pub healthy: bool,
193    #[serde(default, skip_serializing_if = "Option::is_none")]
194    pub detail: Option<String>,
195}
196
197/// Single reconciliation history entry.
198#[derive(Debug, Clone, Serialize, Deserialize)]
199pub struct ReconcileHistoryEntry {
200    pub timestamp: String,
201    pub duration_ms: u64,
202    pub report: ReconcileReport,
203}
204
205/// Tenant state in state dump.
206#[derive(Debug, Clone, Serialize, Deserialize)]
207pub struct TenantStateDump {
208    pub tenant_id: String,
209    pub pools: Vec<PoolStateDump>,
210}
211
212/// Pool state in state dump.
213#[derive(Debug, Clone, Serialize, Deserialize)]
214pub struct PoolStateDump {
215    pub pool_id: String,
216    pub instances: Vec<InstanceState>,
217    pub desired_counts: DesiredCounts,
218}
219
220/// Content for StateDump response (boxed to reduce enum size).
221#[derive(Debug, Clone, Serialize, Deserialize)]
222pub struct StateDumpContent {
223    pub node_info: NodeInfo,
224    pub node_stats: NodeStats,
225    #[serde(default)]
226    pub metrics: Option<crate::observability::metrics::MetricsSnapshot>,
227    #[serde(default)]
228    pub audit_log: Option<Vec<crate::audit::AuditEntry>>,
229    pub tenants: Vec<TenantStateDump>,
230}
231
232// ============================================================================
233// Reconcile report
234// ============================================================================
235
236#[derive(Debug, Clone, Default, Serialize, Deserialize)]
237pub struct ReconcileReport {
238    pub tenants_created: Vec<String>,
239    pub tenants_pruned: Vec<String>,
240    pub pools_created: Vec<String>,
241    pub instances_created: u32,
242    pub instances_started: u32,
243    pub instances_warmed: u32,
244    pub instances_slept: u32,
245    pub instances_stopped: u32,
246    #[serde(default)]
247    pub instances_deferred: u32,
248    pub errors: Vec<String>,
249}
250
251// ============================================================================
252// Typed message protocol (QUIC API)
253// ============================================================================
254
255/// Strongly typed request sent over QUIC streams.
256#[derive(Debug, Clone, Serialize, Deserialize)]
257pub enum AgentRequest {
258    /// Push a new desired state for reconciliation (unsigned, dev mode only).
259    Reconcile(DesiredState),
260    /// Push a signed desired state for reconciliation (production mode).
261    ReconcileSigned(SignedPayload),
262    /// Query node capabilities and identity.
263    NodeInfo,
264    /// Query aggregate node statistics.
265    NodeStats,
266    /// List all tenants on this node.
267    TenantList,
268    /// List instances for a specific tenant (optionally filtered by pool).
269    InstanceList {
270        tenant_id: String,
271        pool_id: Option<String>,
272    },
273    /// Urgently wake a sleeping instance.
274    WakeInstance {
275        tenant_id: String,
276        pool_id: String,
277        instance_id: String,
278    },
279    /// Perform an imperative lifecycle action on a specific instance.
280    InstanceAction {
281        tenant_id: String,
282        pool_id: String,
283        instance_id: String,
284        action: InstanceAction,
285    },
286    /// Forward a sandbox operation (filesystem, exec, logs) to the guest agent.
287    SandboxAction {
288        tenant_id: String,
289        pool_id: String,
290        instance_id: String,
291        request: serde_json::Value,
292    },
293    /// Query the status of an ongoing deployment/rollout for a pool.
294    DeploymentStatus { tenant_id: String, pool_id: String },
295    /// Pause an ongoing deployment/rollout.
296    PauseDeployment { tenant_id: String, pool_id: String },
297    /// Resume a paused deployment/rollout.
298    ResumeDeployment { tenant_id: String, pool_id: String },
299    /// Rollback a deployment to the previous revision.
300    RollbackDeployment {
301        tenant_id: String,
302        pool_id: String,
303        #[serde(default)]
304        target_revision: Option<String>,
305    },
306    /// Perform the same action on multiple instances at once.
307    BatchInstanceAction { actions: Vec<BatchActionItem> },
308    /// Perform pool-level operations (affect all instances in pool).
309    PoolAction {
310        tenant_id: String,
311        pool_id: String,
312        action: PoolActionType,
313    },
314    /// Query current metrics snapshot.
315    GetMetrics,
316    /// Retrieve recent audit log entries for a tenant.
317    GetAuditLog {
318        tenant_id: String,
319        #[serde(default)]
320        last_n: Option<u32>,
321        #[serde(default)]
322        since: Option<String>,
323    },
324    /// Get detailed health status for instances.
325    GetHealthStatus {
326        #[serde(default)]
327        tenant_id: Option<String>,
328        #[serde(default)]
329        pool_id: Option<String>,
330    },
331    /// Retrieve reconciliation history.
332    GetReconcileHistory {
333        #[serde(default)]
334        last_n: Option<u32>,
335    },
336    /// Force an immediate reconciliation pass (debug/troubleshooting).
337    ForceReconcile { dry_run: bool },
338    /// Export complete node state for debugging.
339    DumpState {
340        include_metrics: bool,
341        include_audit_log: bool,
342    },
343    /// Hot reload secrets without restarting instances.
344    UpdateSecrets {
345        tenant_id: String,
346        secrets_hash: String,
347        force_reload: bool,
348    },
349    /// Update config drive for instances in a pool.
350    UpdateConfig {
351        tenant_id: String,
352        pool_id: String,
353        config_version: u64,
354    },
355    /// Request incremental state events since a given sequence number.
356    ///
357    /// Returns events with sequence > `since`. If the event log has been
358    /// truncated past the requested sequence, the coordinator responds with
359    /// a full `DesiredState` instead.
360    SyncEvents { since: u64 },
361}
362
363/// Imperative lifecycle action for a single instance.
364#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
365pub enum InstanceAction {
366    Start,
367    Stop,
368    Sleep,
369    Wake,
370    Warm,
371    Destroy,
372}
373
374/// Strongly typed response returned over QUIC streams.
375#[derive(Debug, Clone, Serialize, Deserialize)]
376pub enum AgentResponse {
377    /// Result of a reconcile pass.
378    ReconcileResult(ReconcileReport),
379    /// Node info.
380    NodeInfo(NodeInfo),
381    /// Aggregate node stats.
382    NodeStats(NodeStats),
383    /// List of tenant IDs.
384    TenantList(Vec<String>),
385    /// List of instance states.
386    InstanceList(Vec<InstanceState>),
387    /// Result of a wake operation.
388    WakeResult { success: bool },
389    /// Result of an imperative instance action.
390    InstanceActionResult {
391        success: bool,
392        new_status: String,
393        #[serde(default, skip_serializing_if = "Option::is_none")]
394        error: Option<String>,
395    },
396    /// Result of a sandbox operation (filesystem, exec, logs).
397    SandboxResult {
398        success: bool,
399        response: serde_json::Value,
400        #[serde(default, skip_serializing_if = "Option::is_none")]
401        error: Option<String>,
402    },
403    /// Error response.
404    Error { code: u16, message: String },
405    /// Deployment status with rollout progress.
406    DeploymentStatus {
407        pool_id: String,
408        current_revision: String,
409        #[serde(default)]
410        target_revision: Option<String>,
411        strategy: UpdateStrategy,
412        phase: DeploymentPhase,
413        instances_updated: u32,
414        instances_pending: u32,
415        #[serde(default)]
416        canary_health: Option<f64>,
417        paused: bool,
418        errors: Vec<String>,
419    },
420    /// Result of pause/resume/rollback operations.
421    DeploymentControlResult {
422        success: bool,
423        pool_id: String,
424        new_phase: String,
425        message: String,
426    },
427    /// Result of batch instance operations.
428    BatchActionResult {
429        results: Vec<BatchActionItemResult>,
430        total: u32,
431        succeeded: u32,
432        failed: u32,
433    },
434    /// Result of pool-level action.
435    PoolActionResult {
436        success: bool,
437        pool_id: String,
438        instances_affected: u32,
439        errors: Vec<String>,
440    },
441    /// Metrics snapshot.
442    Metrics(crate::observability::metrics::MetricsSnapshot),
443    /// Audit log entries.
444    AuditLog {
445        entries: Vec<crate::audit::AuditEntry>,
446        total_count: u32,
447    },
448    /// Health status report for instances.
449    HealthStatus {
450        instances: Vec<InstanceHealthReport>,
451        unhealthy_count: u32,
452        degraded_count: u32,
453    },
454    /// Reconciliation history.
455    ReconcileHistory { runs: Vec<ReconcileHistoryEntry> },
456    /// Complete node state dump (boxed due to size).
457    StateDump(Box<StateDumpContent>),
458    /// Result of secrets update.
459    SecretsUpdateResult {
460        success: bool,
461        tenant_id: String,
462        instances_reloaded: u32,
463        errors: Vec<String>,
464    },
465    /// Result of config update.
466    ConfigUpdateResult {
467        success: bool,
468        pool_id: String,
469        instances_updated: u32,
470        errors: Vec<String>,
471    },
472    /// Incremental state events in response to `SyncEvents`.
473    SyncEventsResult {
474        /// Events with sequence > the requested `since` value.
475        events: Vec<serde_json::Value>,
476        /// Current sequence number at the coordinator.
477        current_sequence: u64,
478    },
479}
480
481#[cfg(test)]
482mod tests {
483    use super::*;
484
485    #[test]
486    fn test_agent_request_serde() {
487        let req = AgentRequest::NodeInfo;
488        let json = serde_json::to_string(&req).unwrap();
489        let parsed: AgentRequest = serde_json::from_str(&json).unwrap();
490        assert!(matches!(parsed, AgentRequest::NodeInfo));
491    }
492
493    #[test]
494    fn test_agent_response_error() {
495        let resp = AgentResponse::Error {
496            code: 404,
497            message: "not found".to_string(),
498        };
499        let json = serde_json::to_string(&resp).unwrap();
500        let parsed: AgentResponse = serde_json::from_str(&json).unwrap();
501        match parsed {
502            AgentResponse::Error { code, message } => {
503                assert_eq!(code, 404);
504                assert_eq!(message, "not found");
505            }
506            _ => panic!("Expected Error variant"),
507        }
508    }
509
510    #[test]
511    fn test_desired_state_serde() {
512        let ds = DesiredState {
513            schema_version: 1,
514            node_id: "node-1".to_string(),
515            tenants: vec![],
516            prune_unknown_tenants: false,
517            prune_unknown_pools: false,
518            sequence: 0,
519        };
520        let json = serde_json::to_string(&ds).unwrap();
521        let parsed: DesiredState = serde_json::from_str(&json).unwrap();
522        assert_eq!(parsed.schema_version, 1);
523        assert_eq!(parsed.node_id, "node-1");
524    }
525
526    #[test]
527    fn test_reconcile_report_default() {
528        let report = ReconcileReport::default();
529        assert!(report.tenants_created.is_empty());
530        assert!(report.errors.is_empty());
531        assert_eq!(report.instances_created, 0);
532    }
533
534    #[test]
535    fn test_instance_action_serde_all_variants() {
536        let actions = vec![
537            InstanceAction::Start,
538            InstanceAction::Stop,
539            InstanceAction::Sleep,
540            InstanceAction::Wake,
541            InstanceAction::Warm,
542            InstanceAction::Destroy,
543        ];
544        for action in actions {
545            let json = serde_json::to_string(&action).unwrap();
546            let parsed: InstanceAction = serde_json::from_str(&json).unwrap();
547            assert_eq!(parsed, action);
548        }
549    }
550
551    #[test]
552    fn test_instance_action_request_serde() {
553        let req = AgentRequest::InstanceAction {
554            tenant_id: "t1".to_string(),
555            pool_id: "p1".to_string(),
556            instance_id: "i1".to_string(),
557            action: InstanceAction::Wake,
558        };
559        let json = serde_json::to_string(&req).unwrap();
560        let parsed: AgentRequest = serde_json::from_str(&json).unwrap();
561        match parsed {
562            AgentRequest::InstanceAction {
563                tenant_id,
564                pool_id,
565                instance_id,
566                action,
567            } => {
568                assert_eq!(tenant_id, "t1");
569                assert_eq!(pool_id, "p1");
570                assert_eq!(instance_id, "i1");
571                assert_eq!(action, InstanceAction::Wake);
572            }
573            _ => panic!("Expected InstanceAction variant"),
574        }
575    }
576
577    #[test]
578    fn test_instance_action_result_success() {
579        let resp = AgentResponse::InstanceActionResult {
580            success: true,
581            new_status: "running".to_string(),
582            error: None,
583        };
584        let json = serde_json::to_string(&resp).unwrap();
585        assert!(!json.contains("error"));
586        let parsed: AgentResponse = serde_json::from_str(&json).unwrap();
587        match parsed {
588            AgentResponse::InstanceActionResult {
589                success,
590                new_status,
591                error,
592            } => {
593                assert!(success);
594                assert_eq!(new_status, "running");
595                assert!(error.is_none());
596            }
597            _ => panic!("Expected InstanceActionResult variant"),
598        }
599    }
600
601    #[test]
602    fn test_sandbox_action_serde_roundtrip() {
603        let req = AgentRequest::SandboxAction {
604            tenant_id: "t1".to_string(),
605            pool_id: "p1".to_string(),
606            instance_id: "i1".to_string(),
607            request: serde_json::json!({"type": "Ping"}),
608        };
609        let json = serde_json::to_string(&req).unwrap();
610        let parsed: AgentRequest = serde_json::from_str(&json).unwrap();
611        match parsed {
612            AgentRequest::SandboxAction {
613                tenant_id,
614                pool_id,
615                instance_id,
616                request,
617            } => {
618                assert_eq!(tenant_id, "t1");
619                assert_eq!(pool_id, "p1");
620                assert_eq!(instance_id, "i1");
621                assert_eq!(request.get("type").and_then(|t| t.as_str()), Some("Ping"));
622            }
623            _ => panic!("Expected SandboxAction variant"),
624        }
625    }
626
627    #[test]
628    fn test_sandbox_result_success_roundtrip() {
629        let resp = AgentResponse::SandboxResult {
630            success: true,
631            response: serde_json::json!({"type": "Pong"}),
632            error: None,
633        };
634        let json = serde_json::to_string(&resp).unwrap();
635        assert!(!json.contains("error"));
636        let parsed: AgentResponse = serde_json::from_str(&json).unwrap();
637        match parsed {
638            AgentResponse::SandboxResult {
639                success,
640                response,
641                error,
642            } => {
643                assert!(success);
644                assert_eq!(response.get("type").and_then(|t| t.as_str()), Some("Pong"));
645                assert!(error.is_none());
646            }
647            _ => panic!("Expected SandboxResult variant"),
648        }
649    }
650
651    #[test]
652    fn test_sandbox_result_failure_roundtrip() {
653        let resp = AgentResponse::SandboxResult {
654            success: false,
655            response: serde_json::Value::Null,
656            error: Some("proxy_error: socket not found".to_string()),
657        };
658        let json = serde_json::to_string(&resp).unwrap();
659        let parsed: AgentResponse = serde_json::from_str(&json).unwrap();
660        match parsed {
661            AgentResponse::SandboxResult { success, error, .. } => {
662                assert!(!success);
663                assert_eq!(error.as_deref(), Some("proxy_error: socket not found"));
664            }
665            _ => panic!("Expected SandboxResult variant"),
666        }
667    }
668
669    #[test]
670    fn test_instance_action_result_failure() {
671        let resp = AgentResponse::InstanceActionResult {
672            success: false,
673            new_status: "stopped".to_string(),
674            error: Some("Instance not found".to_string()),
675        };
676        let json = serde_json::to_string(&resp).unwrap();
677        let parsed: AgentResponse = serde_json::from_str(&json).unwrap();
678        match parsed {
679            AgentResponse::InstanceActionResult {
680                success,
681                new_status,
682                error,
683            } => {
684                assert!(!success);
685                assert_eq!(new_status, "stopped");
686                assert_eq!(error.as_deref(), Some("Instance not found"));
687            }
688            _ => panic!("Expected InstanceActionResult variant"),
689        }
690    }
691
692    #[test]
693    fn test_desired_pool_backward_compat_no_new_fields() {
694        // Old JSON without default_update_strategy should still parse
695        let json = r#"{
696            "pool_id": "gateways",
697            "flake_ref": "github:org/repo",
698            "profile": "minimal",
699            "instance_resources": {"vcpus": 2, "mem_mib": 1024},
700            "desired_counts": {"running": 3, "warm": 1, "sleeping": 0}
701        }"#;
702        let parsed: DesiredPool = serde_json::from_str(json).unwrap();
703        assert_eq!(parsed.pool_id, "gateways");
704        assert!(parsed.default_update_strategy.is_none());
705        assert!(parsed.sleep_policy.is_none());
706    }
707
708    #[test]
709    fn test_desired_pool_with_update_strategy() {
710        use crate::pool::UpdateStrategy;
711
712        let json = r#"{
713            "pool_id": "gateways",
714            "flake_ref": ".",
715            "profile": "minimal",
716            "instance_resources": {"vcpus": 1, "mem_mib": 512},
717            "desired_counts": {"running": 1, "warm": 0, "sleeping": 0},
718            "default_update_strategy": {"type": "canary", "canary_count": 2, "canary_duration_secs": 600, "success_threshold": 0.99}
719        }"#;
720        let parsed: DesiredPool = serde_json::from_str(json).unwrap();
721        let strategy = parsed.default_update_strategy.unwrap();
722        match strategy {
723            UpdateStrategy::Canary(c) => {
724                assert_eq!(c.canary_count, 2);
725                assert_eq!(c.canary_duration_secs, 600);
726                assert!((c.success_threshold - 0.99).abs() < 0.001);
727            }
728            _ => panic!("Expected Canary strategy"),
729        }
730    }
731
732    #[test]
733    fn test_desired_pool_update_strategy_roundtrip() {
734        use crate::pool::{RollingUpdateStrategy, UpdateStrategy};
735
736        let pool = DesiredPool {
737            pool_id: "workers".to_string(),
738            flake_ref: ".".to_string(),
739            profile: "minimal".to_string(),
740            role: Role::Worker,
741            instance_resources: InstanceResources {
742                vcpus: 1,
743                mem_mib: 512,
744                data_disk_mib: 0,
745            },
746            desired_counts: DesiredCounts {
747                running: 1,
748                warm: 0,
749                sleeping: 0,
750            },
751            runtime_policy: RuntimePolicy::default(),
752            seccomp_policy: "baseline".to_string(),
753            snapshot_compression: "none".to_string(),
754            routing_table: None,
755            secret_scopes: vec![],
756            sleep_policy: None,
757            default_update_strategy: Some(UpdateStrategy::Rolling(RollingUpdateStrategy {
758                max_unavailable: 3,
759                max_surge: 2,
760                health_check_timeout_secs: 90,
761            })),
762            registry_artifact: None,
763        };
764        let json = serde_json::to_string(&pool).unwrap();
765        let parsed: DesiredPool = serde_json::from_str(&json).unwrap();
766        let strategy = parsed.default_update_strategy.unwrap();
767        match strategy {
768            UpdateStrategy::Rolling(r) => {
769                assert_eq!(r.max_unavailable, 3);
770                assert_eq!(r.max_surge, 2);
771                assert_eq!(r.health_check_timeout_secs, 90);
772            }
773            _ => panic!("Expected Rolling strategy"),
774        }
775    }
776
777    #[test]
778    fn test_desired_tenant_backward_compat_no_preferred_regions() {
779        // Old JSON without preferred_regions should still parse
780        let json = r#"{
781            "tenant_id": "acme",
782            "network": {"tenant_net_id": 1, "ipv4_subnet": "10.240.1.0/24"},
783            "quotas": {"max_vcpus": 16, "max_mem_mib": 32768, "max_running": 8, "max_warm": 4, "max_pools": 4, "max_instances_per_pool": 16, "max_disk_gib": 100},
784            "pools": []
785        }"#;
786        let parsed: DesiredTenant = serde_json::from_str(json).unwrap();
787        assert_eq!(parsed.tenant_id, "acme");
788        assert!(parsed.preferred_regions.is_empty());
789    }
790
791    #[test]
792    fn test_desired_tenant_with_preferred_regions() {
793        let json = r#"{
794            "tenant_id": "acme",
795            "network": {"tenant_net_id": 1, "ipv4_subnet": "10.240.1.0/24"},
796            "quotas": {"max_vcpus": 16, "max_mem_mib": 32768, "max_running": 8, "max_warm": 4, "max_pools": 4, "max_instances_per_pool": 16, "max_disk_gib": 100},
797            "pools": [],
798            "preferred_regions": ["us-east-1", "eu-west-1"]
799        }"#;
800        let parsed: DesiredTenant = serde_json::from_str(json).unwrap();
801        assert_eq!(parsed.preferred_regions, vec!["us-east-1", "eu-west-1"]);
802    }
803
804    #[test]
805    fn test_desired_tenant_preferred_regions_roundtrip() {
806        let tenant = DesiredTenant {
807            tenant_id: "acme".to_string(),
808            network: DesiredTenantNetwork {
809                tenant_net_id: 5,
810                ipv4_subnet: "10.240.5.0/24".to_string(),
811            },
812            quotas: TenantQuota::default(),
813            secrets_hash: None,
814            pools: vec![],
815            preferred_regions: vec!["us-west-2".to_string(), "ap-southeast-1".to_string()],
816        };
817        let json = serde_json::to_string(&tenant).unwrap();
818        let parsed: DesiredTenant = serde_json::from_str(&json).unwrap();
819        assert_eq!(parsed.preferred_regions.len(), 2);
820        assert_eq!(parsed.preferred_regions[0], "us-west-2");
821        assert_eq!(parsed.preferred_regions[1], "ap-southeast-1");
822    }
823
824    #[test]
825    fn test_desired_pool_backward_compat_no_registry_artifact() {
826        // Old JSON without registry_artifact should still parse
827        let json = r#"{
828            "pool_id": "gateways",
829            "flake_ref": "github:org/repo",
830            "profile": "minimal",
831            "instance_resources": {"vcpus": 2, "mem_mib": 1024},
832            "desired_counts": {"running": 3, "warm": 1, "sleeping": 0}
833        }"#;
834        let parsed: DesiredPool = serde_json::from_str(json).unwrap();
835        assert_eq!(parsed.pool_id, "gateways");
836        assert!(parsed.registry_artifact.is_none());
837    }
838
839    #[test]
840    fn test_desired_pool_with_registry_artifact() {
841        let json = r#"{
842            "pool_id": "gateways",
843            "flake_ref": ".",
844            "profile": "minimal",
845            "instance_resources": {"vcpus": 1, "mem_mib": 512},
846            "desired_counts": {"running": 1, "warm": 0, "sleeping": 0},
847            "registry_artifact": {"template_id": "hello", "revision": "abc123"}
848        }"#;
849        let parsed: DesiredPool = serde_json::from_str(json).unwrap();
850        let ra = parsed.registry_artifact.unwrap();
851        assert_eq!(ra.template_id, "hello");
852        assert_eq!(ra.revision.as_deref(), Some("abc123"));
853    }
854
855    #[test]
856    fn test_desired_pool_registry_artifact_no_revision() {
857        let json = r#"{
858            "pool_id": "gateways",
859            "flake_ref": ".",
860            "profile": "minimal",
861            "instance_resources": {"vcpus": 1, "mem_mib": 512},
862            "desired_counts": {"running": 1, "warm": 0, "sleeping": 0},
863            "registry_artifact": {"template_id": "openclaw"}
864        }"#;
865        let parsed: DesiredPool = serde_json::from_str(json).unwrap();
866        let ra = parsed.registry_artifact.unwrap();
867        assert_eq!(ra.template_id, "openclaw");
868        assert!(ra.revision.is_none());
869    }
870
871    #[test]
872    fn test_desired_pool_registry_artifact_roundtrip() {
873        use crate::pool::{RegistryArtifact, RollingUpdateStrategy, UpdateStrategy};
874
875        let pool = DesiredPool {
876            pool_id: "workers".to_string(),
877            flake_ref: ".".to_string(),
878            profile: "minimal".to_string(),
879            role: Role::Worker,
880            instance_resources: InstanceResources {
881                vcpus: 1,
882                mem_mib: 512,
883                data_disk_mib: 0,
884            },
885            desired_counts: DesiredCounts {
886                running: 1,
887                warm: 0,
888                sleeping: 0,
889            },
890            runtime_policy: RuntimePolicy::default(),
891            seccomp_policy: "baseline".to_string(),
892            snapshot_compression: "none".to_string(),
893            routing_table: None,
894            secret_scopes: vec![],
895            sleep_policy: None,
896            default_update_strategy: Some(
897                UpdateStrategy::Rolling(RollingUpdateStrategy::default()),
898            ),
899            registry_artifact: Some(RegistryArtifact {
900                template_id: "hello".to_string(),
901                revision: Some("rev-abc123".to_string()),
902            }),
903        };
904        let json = serde_json::to_string(&pool).unwrap();
905        let parsed: DesiredPool = serde_json::from_str(&json).unwrap();
906        let ra = parsed.registry_artifact.unwrap();
907        assert_eq!(ra.template_id, "hello");
908        assert_eq!(ra.revision.as_deref(), Some("rev-abc123"));
909    }
910
911    // ========================================================================
912    // Tests for new protocol extensions
913    // ========================================================================
914
915    #[test]
916    fn test_deployment_phase_serde_all_variants() {
917        let phases = vec![
918            DeploymentPhase::NotStarted,
919            DeploymentPhase::CanaryEvaluation,
920            DeploymentPhase::RollingUpdate,
921            DeploymentPhase::Paused,
922            DeploymentPhase::Complete,
923            DeploymentPhase::RolledBack,
924            DeploymentPhase::Failed,
925        ];
926        for phase in phases {
927            let json = serde_json::to_string(&phase).unwrap();
928            let parsed: DeploymentPhase = serde_json::from_str(&json).unwrap();
929            assert_eq!(parsed, phase);
930        }
931    }
932
933    #[test]
934    fn test_batch_action_item_serde() {
935        let item = BatchActionItem {
936            tenant_id: "t1".to_string(),
937            pool_id: "p1".to_string(),
938            instance_id: "i1".to_string(),
939            action: InstanceAction::Start,
940        };
941        let json = serde_json::to_string(&item).unwrap();
942        let parsed: BatchActionItem = serde_json::from_str(&json).unwrap();
943        assert_eq!(parsed.tenant_id, "t1");
944        assert_eq!(parsed.pool_id, "p1");
945        assert_eq!(parsed.instance_id, "i1");
946        assert_eq!(parsed.action, InstanceAction::Start);
947    }
948
949    #[test]
950    fn test_pool_action_type_serde_all_variants() {
951        let actions = vec![
952            PoolActionType::StartAll,
953            PoolActionType::StopAll,
954            PoolActionType::WarmAll,
955            PoolActionType::DestroyAll { wipe_volumes: true },
956            PoolActionType::ScaleTo {
957                running: 3,
958                warm: 1,
959                sleeping: 0,
960            },
961        ];
962        for action in actions {
963            let json = serde_json::to_string(&action).unwrap();
964            let parsed: PoolActionType = serde_json::from_str(&json).unwrap();
965            assert_eq!(parsed, action);
966        }
967    }
968
969    #[test]
970    fn test_agent_request_deployment_status() {
971        let req = AgentRequest::DeploymentStatus {
972            tenant_id: "acme".to_string(),
973            pool_id: "gateways".to_string(),
974        };
975        let json = serde_json::to_string(&req).unwrap();
976        let parsed: AgentRequest = serde_json::from_str(&json).unwrap();
977        match parsed {
978            AgentRequest::DeploymentStatus { tenant_id, pool_id } => {
979                assert_eq!(tenant_id, "acme");
980                assert_eq!(pool_id, "gateways");
981            }
982            _ => panic!("Expected DeploymentStatus variant"),
983        }
984    }
985
986    #[test]
987    fn test_agent_request_pause_deployment() {
988        let req = AgentRequest::PauseDeployment {
989            tenant_id: "acme".to_string(),
990            pool_id: "workers".to_string(),
991        };
992        let json = serde_json::to_string(&req).unwrap();
993        let parsed: AgentRequest = serde_json::from_str(&json).unwrap();
994        assert!(matches!(parsed, AgentRequest::PauseDeployment { .. }));
995    }
996
997    #[test]
998    fn test_agent_request_resume_deployment() {
999        let req = AgentRequest::ResumeDeployment {
1000            tenant_id: "acme".to_string(),
1001            pool_id: "workers".to_string(),
1002        };
1003        let json = serde_json::to_string(&req).unwrap();
1004        let parsed: AgentRequest = serde_json::from_str(&json).unwrap();
1005        assert!(matches!(parsed, AgentRequest::ResumeDeployment { .. }));
1006    }
1007
1008    #[test]
1009    fn test_agent_request_rollback_deployment() {
1010        let req = AgentRequest::RollbackDeployment {
1011            tenant_id: "acme".to_string(),
1012            pool_id: "workers".to_string(),
1013            target_revision: Some("rev-abc123".to_string()),
1014        };
1015        let json = serde_json::to_string(&req).unwrap();
1016        let parsed: AgentRequest = serde_json::from_str(&json).unwrap();
1017        match parsed {
1018            AgentRequest::RollbackDeployment {
1019                target_revision, ..
1020            } => {
1021                assert_eq!(target_revision.as_deref(), Some("rev-abc123"));
1022            }
1023            _ => panic!("Expected RollbackDeployment variant"),
1024        }
1025    }
1026
1027    #[test]
1028    fn test_agent_request_batch_instance_action() {
1029        let req = AgentRequest::BatchInstanceAction {
1030            actions: vec![
1031                BatchActionItem {
1032                    tenant_id: "t1".to_string(),
1033                    pool_id: "p1".to_string(),
1034                    instance_id: "i1".to_string(),
1035                    action: InstanceAction::Start,
1036                },
1037                BatchActionItem {
1038                    tenant_id: "t1".to_string(),
1039                    pool_id: "p1".to_string(),
1040                    instance_id: "i2".to_string(),
1041                    action: InstanceAction::Stop,
1042                },
1043            ],
1044        };
1045        let json = serde_json::to_string(&req).unwrap();
1046        let parsed: AgentRequest = serde_json::from_str(&json).unwrap();
1047        match parsed {
1048            AgentRequest::BatchInstanceAction { actions } => {
1049                assert_eq!(actions.len(), 2);
1050                assert_eq!(actions[0].instance_id, "i1");
1051                assert_eq!(actions[1].instance_id, "i2");
1052            }
1053            _ => panic!("Expected BatchInstanceAction variant"),
1054        }
1055    }
1056
1057    #[test]
1058    fn test_agent_request_pool_action() {
1059        let req = AgentRequest::PoolAction {
1060            tenant_id: "acme".to_string(),
1061            pool_id: "workers".to_string(),
1062            action: PoolActionType::StartAll,
1063        };
1064        let json = serde_json::to_string(&req).unwrap();
1065        let parsed: AgentRequest = serde_json::from_str(&json).unwrap();
1066        match parsed {
1067            AgentRequest::PoolAction { action, .. } => {
1068                assert_eq!(action, PoolActionType::StartAll);
1069            }
1070            _ => panic!("Expected PoolAction variant"),
1071        }
1072    }
1073
1074    #[test]
1075    fn test_agent_request_get_metrics() {
1076        let req = AgentRequest::GetMetrics;
1077        let json = serde_json::to_string(&req).unwrap();
1078        let parsed: AgentRequest = serde_json::from_str(&json).unwrap();
1079        assert!(matches!(parsed, AgentRequest::GetMetrics));
1080    }
1081
1082    #[test]
1083    fn test_agent_request_get_audit_log() {
1084        let req = AgentRequest::GetAuditLog {
1085            tenant_id: "acme".to_string(),
1086            last_n: Some(10),
1087            since: Some("2025-01-01T00:00:00Z".to_string()),
1088        };
1089        let json = serde_json::to_string(&req).unwrap();
1090        let parsed: AgentRequest = serde_json::from_str(&json).unwrap();
1091        match parsed {
1092            AgentRequest::GetAuditLog {
1093                tenant_id,
1094                last_n,
1095                since,
1096            } => {
1097                assert_eq!(tenant_id, "acme");
1098                assert_eq!(last_n, Some(10));
1099                assert_eq!(since.as_deref(), Some("2025-01-01T00:00:00Z"));
1100            }
1101            _ => panic!("Expected GetAuditLog variant"),
1102        }
1103    }
1104
1105    #[test]
1106    fn test_agent_request_get_health_status() {
1107        let req = AgentRequest::GetHealthStatus {
1108            tenant_id: Some("acme".to_string()),
1109            pool_id: None,
1110        };
1111        let json = serde_json::to_string(&req).unwrap();
1112        let parsed: AgentRequest = serde_json::from_str(&json).unwrap();
1113        match parsed {
1114            AgentRequest::GetHealthStatus { tenant_id, pool_id } => {
1115                assert_eq!(tenant_id.as_deref(), Some("acme"));
1116                assert!(pool_id.is_none());
1117            }
1118            _ => panic!("Expected GetHealthStatus variant"),
1119        }
1120    }
1121
1122    #[test]
1123    fn test_agent_request_get_reconcile_history() {
1124        let req = AgentRequest::GetReconcileHistory { last_n: Some(5) };
1125        let json = serde_json::to_string(&req).unwrap();
1126        let parsed: AgentRequest = serde_json::from_str(&json).unwrap();
1127        match parsed {
1128            AgentRequest::GetReconcileHistory { last_n } => {
1129                assert_eq!(last_n, Some(5));
1130            }
1131            _ => panic!("Expected GetReconcileHistory variant"),
1132        }
1133    }
1134
1135    #[test]
1136    fn test_agent_request_force_reconcile() {
1137        let req = AgentRequest::ForceReconcile { dry_run: true };
1138        let json = serde_json::to_string(&req).unwrap();
1139        let parsed: AgentRequest = serde_json::from_str(&json).unwrap();
1140        match parsed {
1141            AgentRequest::ForceReconcile { dry_run } => {
1142                assert!(dry_run);
1143            }
1144            _ => panic!("Expected ForceReconcile variant"),
1145        }
1146    }
1147
1148    #[test]
1149    fn test_agent_request_dump_state() {
1150        let req = AgentRequest::DumpState {
1151            include_metrics: true,
1152            include_audit_log: false,
1153        };
1154        let json = serde_json::to_string(&req).unwrap();
1155        let parsed: AgentRequest = serde_json::from_str(&json).unwrap();
1156        match parsed {
1157            AgentRequest::DumpState {
1158                include_metrics,
1159                include_audit_log,
1160            } => {
1161                assert!(include_metrics);
1162                assert!(!include_audit_log);
1163            }
1164            _ => panic!("Expected DumpState variant"),
1165        }
1166    }
1167
1168    #[test]
1169    fn test_agent_request_update_secrets() {
1170        let req = AgentRequest::UpdateSecrets {
1171            tenant_id: "acme".to_string(),
1172            secrets_hash: "sha256:abc123".to_string(),
1173            force_reload: false,
1174        };
1175        let json = serde_json::to_string(&req).unwrap();
1176        let parsed: AgentRequest = serde_json::from_str(&json).unwrap();
1177        match parsed {
1178            AgentRequest::UpdateSecrets {
1179                tenant_id,
1180                secrets_hash,
1181                force_reload,
1182            } => {
1183                assert_eq!(tenant_id, "acme");
1184                assert_eq!(secrets_hash, "sha256:abc123");
1185                assert!(!force_reload);
1186            }
1187            _ => panic!("Expected UpdateSecrets variant"),
1188        }
1189    }
1190
1191    #[test]
1192    fn test_agent_request_update_config() {
1193        let req = AgentRequest::UpdateConfig {
1194            tenant_id: "acme".to_string(),
1195            pool_id: "workers".to_string(),
1196            config_version: 42,
1197        };
1198        let json = serde_json::to_string(&req).unwrap();
1199        let parsed: AgentRequest = serde_json::from_str(&json).unwrap();
1200        match parsed {
1201            AgentRequest::UpdateConfig {
1202                tenant_id,
1203                pool_id,
1204                config_version,
1205            } => {
1206                assert_eq!(tenant_id, "acme");
1207                assert_eq!(pool_id, "workers");
1208                assert_eq!(config_version, 42);
1209            }
1210            _ => panic!("Expected UpdateConfig variant"),
1211        }
1212    }
1213
1214    #[test]
1215    fn test_agent_response_deployment_status() {
1216        use crate::pool::{RollingUpdateStrategy, UpdateStrategy};
1217
1218        let resp = AgentResponse::DeploymentStatus {
1219            pool_id: "workers".to_string(),
1220            current_revision: "rev-old".to_string(),
1221            target_revision: Some("rev-new".to_string()),
1222            strategy: UpdateStrategy::Rolling(RollingUpdateStrategy::default()),
1223            phase: DeploymentPhase::RollingUpdate,
1224            instances_updated: 5,
1225            instances_pending: 3,
1226            canary_health: None,
1227            paused: false,
1228            errors: vec![],
1229        };
1230        let json = serde_json::to_string(&resp).unwrap();
1231        let parsed: AgentResponse = serde_json::from_str(&json).unwrap();
1232        match parsed {
1233            AgentResponse::DeploymentStatus {
1234                pool_id,
1235                current_revision,
1236                phase,
1237                ..
1238            } => {
1239                assert_eq!(pool_id, "workers");
1240                assert_eq!(current_revision, "rev-old");
1241                assert_eq!(phase, DeploymentPhase::RollingUpdate);
1242            }
1243            _ => panic!("Expected DeploymentStatus variant"),
1244        }
1245    }
1246
1247    #[test]
1248    fn test_agent_response_deployment_control_result() {
1249        let resp = AgentResponse::DeploymentControlResult {
1250            success: true,
1251            pool_id: "workers".to_string(),
1252            new_phase: "paused".to_string(),
1253            message: "Deployment paused successfully".to_string(),
1254        };
1255        let json = serde_json::to_string(&resp).unwrap();
1256        let parsed: AgentResponse = serde_json::from_str(&json).unwrap();
1257        match parsed {
1258            AgentResponse::DeploymentControlResult {
1259                success, pool_id, ..
1260            } => {
1261                assert!(success);
1262                assert_eq!(pool_id, "workers");
1263            }
1264            _ => panic!("Expected DeploymentControlResult variant"),
1265        }
1266    }
1267
1268    #[test]
1269    fn test_agent_response_batch_action_result() {
1270        let resp = AgentResponse::BatchActionResult {
1271            results: vec![
1272                BatchActionItemResult {
1273                    tenant_id: "t1".to_string(),
1274                    pool_id: "p1".to_string(),
1275                    instance_id: "i1".to_string(),
1276                    success: true,
1277                    new_status: Some("running".to_string()),
1278                    error: None,
1279                },
1280                BatchActionItemResult {
1281                    tenant_id: "t1".to_string(),
1282                    pool_id: "p1".to_string(),
1283                    instance_id: "i2".to_string(),
1284                    success: false,
1285                    new_status: None,
1286                    error: Some("Instance not found".to_string()),
1287                },
1288            ],
1289            total: 2,
1290            succeeded: 1,
1291            failed: 1,
1292        };
1293        let json = serde_json::to_string(&resp).unwrap();
1294        let parsed: AgentResponse = serde_json::from_str(&json).unwrap();
1295        match parsed {
1296            AgentResponse::BatchActionResult {
1297                results,
1298                total,
1299                succeeded,
1300                failed,
1301            } => {
1302                assert_eq!(total, 2);
1303                assert_eq!(succeeded, 1);
1304                assert_eq!(failed, 1);
1305                assert_eq!(results.len(), 2);
1306                assert!(results[0].success);
1307                assert!(!results[1].success);
1308            }
1309            _ => panic!("Expected BatchActionResult variant"),
1310        }
1311    }
1312
1313    #[test]
1314    fn test_agent_response_pool_action_result() {
1315        let resp = AgentResponse::PoolActionResult {
1316            success: true,
1317            pool_id: "workers".to_string(),
1318            instances_affected: 5,
1319            errors: vec![],
1320        };
1321        let json = serde_json::to_string(&resp).unwrap();
1322        let parsed: AgentResponse = serde_json::from_str(&json).unwrap();
1323        match parsed {
1324            AgentResponse::PoolActionResult {
1325                success,
1326                pool_id,
1327                instances_affected,
1328                ..
1329            } => {
1330                assert!(success);
1331                assert_eq!(pool_id, "workers");
1332                assert_eq!(instances_affected, 5);
1333            }
1334            _ => panic!("Expected PoolActionResult variant"),
1335        }
1336    }
1337
1338    #[test]
1339    fn test_agent_response_metrics() {
1340        use crate::observability::metrics::MetricsSnapshot;
1341
1342        let snapshot = MetricsSnapshot {
1343            requests_total: 100,
1344            requests_reconcile: 10,
1345            requests_node_info: 5,
1346            requests_node_stats: 3,
1347            requests_tenant_list: 2,
1348            requests_instance_list: 15,
1349            requests_wake: 8,
1350            requests_rate_limited: 1,
1351            requests_failed: 2,
1352            reconcile_runs: 10,
1353            reconcile_errors: 0,
1354            reconcile_duration_ms: 500,
1355            instances_created: 20,
1356            instances_started: 18,
1357            instances_stopped: 10,
1358            instances_slept: 5,
1359            instances_woken: 8,
1360            instances_destroyed: 2,
1361            instances_deferred: 3,
1362            connections_accepted: 50,
1363            connections_rejected: 1,
1364            build_image_duration_ms: 0,
1365            vm_start_duration_ms: 0,
1366            vsock_handshake_rtt_ms: 0,
1367        };
1368        let resp = AgentResponse::Metrics(snapshot);
1369        let json = serde_json::to_string(&resp).unwrap();
1370        let parsed: AgentResponse = serde_json::from_str(&json).unwrap();
1371        match parsed {
1372            AgentResponse::Metrics(s) => {
1373                assert_eq!(s.requests_total, 100);
1374                assert_eq!(s.reconcile_runs, 10);
1375                assert_eq!(s.instances_created, 20);
1376            }
1377            _ => panic!("Expected Metrics variant"),
1378        }
1379    }
1380
1381    #[test]
1382    fn test_agent_response_audit_log() {
1383        use crate::audit::{AuditAction, AuditEntry};
1384
1385        let resp = AgentResponse::AuditLog {
1386            entries: vec![AuditEntry {
1387                timestamp: "2025-01-01T00:00:00Z".to_string(),
1388                tenant_id: "acme".to_string(),
1389                pool_id: Some("workers".to_string()),
1390                instance_id: Some("i-001".to_string()),
1391                action: AuditAction::InstanceStarted,
1392                detail: Some("pid=12345".to_string()),
1393                threats: vec![],
1394                gate_decision: None,
1395                frame_sequence: None,
1396            }],
1397            total_count: 1,
1398        };
1399        let json = serde_json::to_string(&resp).unwrap();
1400        let parsed: AgentResponse = serde_json::from_str(&json).unwrap();
1401        match parsed {
1402            AgentResponse::AuditLog {
1403                entries,
1404                total_count,
1405            } => {
1406                assert_eq!(total_count, 1);
1407                assert_eq!(entries.len(), 1);
1408                assert_eq!(entries[0].tenant_id, "acme");
1409            }
1410            _ => panic!("Expected AuditLog variant"),
1411        }
1412    }
1413
1414    #[test]
1415    fn test_agent_response_secrets_update_result() {
1416        let resp = AgentResponse::SecretsUpdateResult {
1417            success: true,
1418            tenant_id: "acme".to_string(),
1419            instances_reloaded: 10,
1420            errors: vec![],
1421        };
1422        let json = serde_json::to_string(&resp).unwrap();
1423        let parsed: AgentResponse = serde_json::from_str(&json).unwrap();
1424        match parsed {
1425            AgentResponse::SecretsUpdateResult {
1426                success,
1427                tenant_id,
1428                instances_reloaded,
1429                ..
1430            } => {
1431                assert!(success);
1432                assert_eq!(tenant_id, "acme");
1433                assert_eq!(instances_reloaded, 10);
1434            }
1435            _ => panic!("Expected SecretsUpdateResult variant"),
1436        }
1437    }
1438
1439    #[test]
1440    fn test_agent_response_config_update_result() {
1441        let resp = AgentResponse::ConfigUpdateResult {
1442            success: true,
1443            pool_id: "workers".to_string(),
1444            instances_updated: 5,
1445            errors: vec![],
1446        };
1447        let json = serde_json::to_string(&resp).unwrap();
1448        let parsed: AgentResponse = serde_json::from_str(&json).unwrap();
1449        match parsed {
1450            AgentResponse::ConfigUpdateResult {
1451                success,
1452                pool_id,
1453                instances_updated,
1454                ..
1455            } => {
1456                assert!(success);
1457                assert_eq!(pool_id, "workers");
1458                assert_eq!(instances_updated, 5);
1459            }
1460            _ => panic!("Expected ConfigUpdateResult variant"),
1461        }
1462    }
1463
1464    // ==========================================================================
1465    // Comprehensive round-trip tests for all protocol variants
1466    // ==========================================================================
1467
1468    /// Build one instance of every `AgentRequest` variant to ensure full coverage.
1469    fn all_request_variants() -> Vec<AgentRequest> {
1470        vec![
1471            AgentRequest::Reconcile(DesiredState {
1472                schema_version: 1,
1473                node_id: "n1".to_string(),
1474                tenants: vec![],
1475                prune_unknown_tenants: false,
1476                prune_unknown_pools: false,
1477                sequence: 0,
1478            }),
1479            AgentRequest::ReconcileSigned(SignedPayload {
1480                payload: b"{}".to_vec(),
1481                signature: b"abcd".to_vec(),
1482                signer_id: "1234".to_string(),
1483            }),
1484            AgentRequest::NodeInfo,
1485            AgentRequest::NodeStats,
1486            AgentRequest::TenantList,
1487            AgentRequest::InstanceList {
1488                tenant_id: "t1".to_string(),
1489                pool_id: Some("p1".to_string()),
1490            },
1491            AgentRequest::InstanceList {
1492                tenant_id: "t1".to_string(),
1493                pool_id: None,
1494            },
1495            AgentRequest::WakeInstance {
1496                tenant_id: "t1".to_string(),
1497                pool_id: "p1".to_string(),
1498                instance_id: "i1".to_string(),
1499            },
1500            AgentRequest::InstanceAction {
1501                tenant_id: "t1".to_string(),
1502                pool_id: "p1".to_string(),
1503                instance_id: "i1".to_string(),
1504                action: InstanceAction::Start,
1505            },
1506            AgentRequest::SandboxAction {
1507                tenant_id: "t1".to_string(),
1508                pool_id: "p1".to_string(),
1509                instance_id: "i1".to_string(),
1510                request: serde_json::json!({"type": "Ping"}),
1511            },
1512            AgentRequest::DeploymentStatus {
1513                tenant_id: "t1".to_string(),
1514                pool_id: "p1".to_string(),
1515            },
1516            AgentRequest::PauseDeployment {
1517                tenant_id: "t1".to_string(),
1518                pool_id: "p1".to_string(),
1519            },
1520            AgentRequest::ResumeDeployment {
1521                tenant_id: "t1".to_string(),
1522                pool_id: "p1".to_string(),
1523            },
1524            AgentRequest::RollbackDeployment {
1525                tenant_id: "t1".to_string(),
1526                pool_id: "p1".to_string(),
1527                target_revision: Some("rev-abc".to_string()),
1528            },
1529            AgentRequest::BatchInstanceAction {
1530                actions: vec![BatchActionItem {
1531                    tenant_id: "t1".to_string(),
1532                    pool_id: "p1".to_string(),
1533                    instance_id: "i1".to_string(),
1534                    action: InstanceAction::Stop,
1535                }],
1536            },
1537            AgentRequest::PoolAction {
1538                tenant_id: "t1".to_string(),
1539                pool_id: "p1".to_string(),
1540                action: PoolActionType::StartAll,
1541            },
1542            AgentRequest::GetMetrics,
1543            AgentRequest::GetAuditLog {
1544                tenant_id: "t1".to_string(),
1545                last_n: Some(10),
1546                since: None,
1547            },
1548            AgentRequest::GetHealthStatus {
1549                tenant_id: Some("t1".to_string()),
1550                pool_id: None,
1551            },
1552            AgentRequest::GetReconcileHistory { last_n: Some(5) },
1553            AgentRequest::ForceReconcile { dry_run: true },
1554            AgentRequest::DumpState {
1555                include_metrics: true,
1556                include_audit_log: false,
1557            },
1558            AgentRequest::UpdateSecrets {
1559                tenant_id: "t1".to_string(),
1560                secrets_hash: "sha256:abc".to_string(),
1561                force_reload: false,
1562            },
1563            AgentRequest::UpdateConfig {
1564                tenant_id: "t1".to_string(),
1565                pool_id: "p1".to_string(),
1566                config_version: 42,
1567            },
1568            AgentRequest::SyncEvents { since: 42 },
1569        ]
1570    }
1571
1572    /// Every `AgentRequest` variant must survive a JSON round-trip.
1573    #[test]
1574    fn test_all_agent_request_variants_round_trip() {
1575        for (i, req) in all_request_variants().into_iter().enumerate() {
1576            let json = serde_json::to_value(&req).unwrap_or_else(|e| {
1577                panic!("Failed to serialize AgentRequest variant #{}: {}", i, e)
1578            });
1579            let _back: AgentRequest = serde_json::from_value(json.clone()).unwrap_or_else(|e| {
1580                panic!(
1581                    "Failed to deserialize AgentRequest variant #{}: {} -- json: {}",
1582                    i, e, json
1583                )
1584            });
1585        }
1586    }
1587
1588    fn test_node_info() -> NodeInfo {
1589        NodeInfo {
1590            node_id: "node-1".to_string(),
1591            hostname: "host".to_string(),
1592            arch: "aarch64".to_string(),
1593            total_vcpus: 8,
1594            total_mem_mib: 16384,
1595            vm_status: Some("running".to_string()),
1596            firecracker_version: Some("1.5.0".to_string()),
1597            jailer_available: true,
1598            cgroup_v2: true,
1599            attestation_provider: "none".to_string(),
1600        }
1601    }
1602
1603    /// Build one instance of every `AgentResponse` variant to ensure full coverage.
1604    fn all_response_variants() -> Vec<AgentResponse> {
1605        vec![
1606            AgentResponse::ReconcileResult(ReconcileReport::default()),
1607            AgentResponse::NodeInfo(test_node_info()),
1608            AgentResponse::NodeStats(NodeStats::default()),
1609            AgentResponse::TenantList(vec!["t1".to_string()]),
1610            AgentResponse::InstanceList(vec![]),
1611            AgentResponse::WakeResult { success: true },
1612            AgentResponse::InstanceActionResult {
1613                success: true,
1614                new_status: "running".to_string(),
1615                error: None,
1616            },
1617            AgentResponse::SandboxResult {
1618                success: true,
1619                response: serde_json::json!({"type": "Pong"}),
1620                error: None,
1621            },
1622            AgentResponse::Error {
1623                code: 500,
1624                message: "internal error".to_string(),
1625            },
1626            AgentResponse::DeploymentStatus {
1627                pool_id: "p1".to_string(),
1628                current_revision: "rev-1".to_string(),
1629                target_revision: None,
1630                strategy: Default::default(),
1631                phase: DeploymentPhase::Complete,
1632                instances_updated: 3,
1633                instances_pending: 0,
1634                canary_health: None,
1635                paused: false,
1636                errors: vec![],
1637            },
1638            AgentResponse::DeploymentControlResult {
1639                success: true,
1640                pool_id: "p1".to_string(),
1641                new_phase: "paused".to_string(),
1642                message: "ok".to_string(),
1643            },
1644            AgentResponse::BatchActionResult {
1645                results: vec![],
1646                total: 0,
1647                succeeded: 0,
1648                failed: 0,
1649            },
1650            AgentResponse::PoolActionResult {
1651                success: true,
1652                pool_id: "p1".to_string(),
1653                instances_affected: 5,
1654                errors: vec![],
1655            },
1656            AgentResponse::Metrics(crate::observability::metrics::global().snapshot()),
1657            AgentResponse::AuditLog {
1658                entries: vec![],
1659                total_count: 0,
1660            },
1661            AgentResponse::HealthStatus {
1662                instances: vec![],
1663                unhealthy_count: 0,
1664                degraded_count: 0,
1665            },
1666            AgentResponse::ReconcileHistory { runs: vec![] },
1667            AgentResponse::StateDump(Box::new(StateDumpContent {
1668                node_info: test_node_info(),
1669                node_stats: NodeStats::default(),
1670                metrics: None,
1671                audit_log: None,
1672                tenants: vec![],
1673            })),
1674            AgentResponse::SecretsUpdateResult {
1675                success: true,
1676                tenant_id: "t1".to_string(),
1677                instances_reloaded: 0,
1678                errors: vec![],
1679            },
1680            AgentResponse::ConfigUpdateResult {
1681                success: true,
1682                pool_id: "p1".to_string(),
1683                instances_updated: 0,
1684                errors: vec![],
1685            },
1686            AgentResponse::SyncEventsResult {
1687                events: vec![serde_json::json!({"type": "TenantAdded", "tenant_id": "acme"})],
1688                current_sequence: 5,
1689            },
1690        ]
1691    }
1692
1693    /// Every `AgentResponse` variant must survive a JSON round-trip.
1694    #[test]
1695    fn test_all_agent_response_variants_round_trip() {
1696        for (i, resp) in all_response_variants().into_iter().enumerate() {
1697            let json = serde_json::to_value(&resp).unwrap_or_else(|e| {
1698                panic!("Failed to serialize AgentResponse variant #{}: {}", i, e)
1699            });
1700            let _back: AgentResponse = serde_json::from_value(json.clone()).unwrap_or_else(|e| {
1701                panic!(
1702                    "Failed to deserialize AgentResponse variant #{}: {} -- json: {}",
1703                    i, e, json
1704                )
1705            });
1706        }
1707    }
1708
1709    /// All `PoolActionType` variants must round-trip through JSON.
1710    #[test]
1711    fn test_pool_action_type_all_variants_round_trip() {
1712        let variants = vec![
1713            PoolActionType::StartAll,
1714            PoolActionType::StopAll,
1715            PoolActionType::WarmAll,
1716            PoolActionType::DestroyAll { wipe_volumes: true },
1717            PoolActionType::ScaleTo {
1718                running: 3,
1719                warm: 1,
1720                sleeping: 2,
1721            },
1722        ];
1723        for v in &variants {
1724            let json = serde_json::to_value(v).unwrap();
1725            let back: PoolActionType = serde_json::from_value(json).unwrap();
1726            assert_eq!(*v, back);
1727        }
1728    }
1729}