Skip to main content

zlayer_types/
cluster.rs

1//! Daemon-level cluster mode selection.
2//!
3//! Defines how a `ZLayer` daemon participates in (or doesn't) cluster
4//! membership. This is the top-level config the daemon reads at startup
5//! to decide which `Cluster` trait implementation to construct.
6//!
7//! For the wire-level join/membership DTOs see [`crate::api::cluster`].
8
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::net::SocketAddr;
12use std::time::Duration;
13
14/// How the daemon participates in (or doesn't) cluster membership.
15///
16/// The `WorkerTier` variant is intentionally large (~285 bytes) because it
17/// carries the full server-role + worker-role config inline. `ClusterMode`
18/// is parsed once at daemon startup and lives in an `Arc<DaemonConfig>` for
19/// the process lifetime, so the size delta is irrelevant in practice — and
20/// boxing it would force every caller of `is_worker_tier_server()` /
21/// `adaptive_ttl_config()` through an extra indirection for no win.
22#[allow(clippy::large_enum_variant)]
23#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
24#[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
25pub enum ClusterMode {
26    /// Single-node daemon. No peers, no consensus. `is_leader()` is always
27    /// true. Suitable for development and single-host deployments.
28    #[default]
29    #[serde(rename = "single-node")]
30    SingleNode,
31
32    /// openraft-backed consensus across the configured peers. This is the
33    /// existing production mode for multi-node deployments.
34    Raft {
35        /// This node's id (must be unique within the cluster).
36        node_id: u64,
37        /// Peer addresses (raft RPC ports). The daemon's own entry must
38        /// be present.
39        peers: Vec<RaftPeer>,
40    },
41
42    /// Static-membership cluster: config-driven peer list, deterministic
43    /// leader (lowest healthy node id), HTTP heartbeats for liveness.
44    /// No consensus log — concurrent writers may race; the per-service
45    /// scale semaphore in `ServiceManager` is the primary mitigation.
46    Static {
47        /// This node's id (must be unique within the cluster).
48        node_id: u64,
49        /// All cluster peers including self.
50        peers: Vec<StaticPeer>,
51        /// Heartbeat probe interval. Default 5s.
52        #[serde(default = "default_heartbeat_interval", with = "duration_secs")]
53        heartbeat_interval: Duration,
54        /// Time after which a peer with no heartbeat is `Unreachable`.
55        /// Default 15s (3x interval).
56        #[serde(default = "default_failure_threshold", with = "duration_secs")]
57        failure_threshold: Duration,
58    },
59
60    /// Nomad-style worker tier: 3–7 control-plane nodes run Raft consensus;
61    /// up to ~10,000 worker nodes join as gRPC clients with adaptive-TTL
62    /// heartbeats and never enter consensus.
63    ///
64    /// Workers are issued mTLS leaf certs by the cluster's worker CA during
65    /// `Register`. Heartbeat cadence scales with cluster size — every
66    /// `StatusAck` carries the next TTL computed from
67    /// `clamp(N_workers / max_heartbeats_per_second, min_ttl, max_ttl)`.
68    #[serde(rename = "worker-tier")]
69    WorkerTier {
70        /// What role THIS node plays.
71        role: WorkerTierRole,
72
73        /// Server-only: this node's id within the raft control plane.
74        /// Required when role == Server; ignored on workers (assigned by
75        /// leader during Register).
76        #[serde(default, skip_serializing_if = "Option::is_none")]
77        node_id: Option<u64>,
78
79        /// Server-only: the raft control-plane peer list (3-7 nodes).
80        /// Required when role == Server.
81        #[serde(default, skip_serializing_if = "Vec::is_empty")]
82        peers: Vec<RaftPeer>,
83
84        /// Server-only: address to bind the worker-facing gRPC server.
85        /// Default `0.0.0.0:3670` (the API server uses 3669; gRPC takes 3670).
86        #[serde(
87            default = "default_worker_grpc_addr",
88            skip_serializing_if = "is_default_worker_grpc_addr"
89        )]
90        worker_grpc_addr: SocketAddr,
91
92        /// Worker-only: control-plane gRPC endpoints to try (round-robin
93        /// fallback). Required when role == Worker.
94        #[serde(default, skip_serializing_if = "Vec::is_empty")]
95        servers: Vec<String>,
96
97        /// Worker-only: path to the bootstrap token file (single line,
98        /// URL-safe-base64 of a `WorkerBootstrapToken`).
99        #[serde(default, skip_serializing_if = "Option::is_none")]
100        token_file: Option<String>,
101
102        /// Worker-only: directory to persist mTLS identity (cert.pem, key.pem,
103        /// ca.pem). Defaults to `<data_dir>/worker/identity/` set by
104        /// `ZLayerDirs`.
105        #[serde(default, skip_serializing_if = "Option::is_none")]
106        identity_dir: Option<String>,
107
108        /// Server-only: worker CA storage directory. Defaults to
109        /// `<data_dir>/cluster/` (same as the existing cluster CA + signer).
110        #[serde(default, skip_serializing_if = "Option::is_none")]
111        worker_ca_dir: Option<String>,
112
113        /// Shared: minimum heartbeat TTL (default 10s).
114        #[serde(default = "default_min_ttl", with = "duration_secs")]
115        heartbeat_min_ttl: Duration,
116
117        /// Shared: maximum heartbeat TTL (default 10min).
118        #[serde(default = "default_max_ttl", with = "duration_secs")]
119        heartbeat_max_ttl: Duration,
120
121        /// Shared: grace period beyond TTL before a worker's lease is
122        /// considered expired. Default 10s.
123        #[serde(default = "default_grace", with = "duration_secs")]
124        heartbeat_grace: Duration,
125
126        /// Server-only: cluster-wide cap on heartbeats/second the leader is
127        /// willing to absorb. The leader hands every worker a TTL such that
128        /// total HB rate ≤ this. Default 50 (Nomad's default).
129        #[serde(default = "default_max_hb")]
130        max_heartbeats_per_second: u32,
131
132        /// Server-only: TTL applied immediately after a leader election so
133        /// workers don't all expire while the new leader is bootstrapping
134        /// its FSM. Default 5min.
135        #[serde(default = "default_failover_ttl", with = "duration_secs")]
136        failover_heartbeat_ttl: Duration,
137
138        /// Free-form node labels (placement, selector matching, etc.).
139        #[serde(default, skip_serializing_if = "HashMap::is_empty")]
140        labels: HashMap<String, String>,
141    },
142}
143
144/// What role this node plays in a worker-tier cluster.
145#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
146#[serde(rename_all = "snake_case")]
147pub enum WorkerTierRole {
148    /// Participates in raft consensus + serves the worker gRPC.
149    Server,
150    /// gRPC client to the control plane; never enters consensus.
151    Worker,
152}
153
154impl ClusterMode {
155    /// Extract an `AdaptiveTtlConfig` from a `WorkerTier` variant. Returns
156    /// `None` for other modes.
157    #[must_use]
158    pub fn adaptive_ttl_config(&self) -> Option<AdaptiveTtlConfig> {
159        if let ClusterMode::WorkerTier {
160            heartbeat_min_ttl,
161            heartbeat_max_ttl,
162            heartbeat_grace,
163            max_heartbeats_per_second,
164            failover_heartbeat_ttl,
165            ..
166        } = self
167        {
168            Some(AdaptiveTtlConfig {
169                min_ttl_secs: u32::try_from(heartbeat_min_ttl.as_secs()).unwrap_or(u32::MAX),
170                max_ttl_secs: u32::try_from(heartbeat_max_ttl.as_secs()).unwrap_or(u32::MAX),
171                grace_secs: u32::try_from(heartbeat_grace.as_secs()).unwrap_or(u32::MAX),
172                max_heartbeats_per_second: *max_heartbeats_per_second,
173                failover_ttl_secs: u32::try_from(failover_heartbeat_ttl.as_secs())
174                    .unwrap_or(u32::MAX),
175            })
176        } else {
177            None
178        }
179    }
180
181    /// Convenience: is this a worker-tier server-role config?
182    #[must_use]
183    pub fn is_worker_tier_server(&self) -> bool {
184        matches!(
185            self,
186            ClusterMode::WorkerTier {
187                role: WorkerTierRole::Server,
188                ..
189            }
190        )
191    }
192
193    /// Convenience: is this a worker-tier worker-role config?
194    #[must_use]
195    pub fn is_worker_tier_worker(&self) -> bool {
196        matches!(
197            self,
198            ClusterMode::WorkerTier {
199                role: WorkerTierRole::Worker,
200                ..
201            }
202        )
203    }
204}
205
206/// A raft peer's identity and reachability.
207#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
208#[serde(deny_unknown_fields)]
209pub struct RaftPeer {
210    pub id: u64,
211    /// Raft RPC address (host:port).
212    pub raft_addr: SocketAddr,
213    /// HTTP API address advertised to other cluster members.
214    pub api_addr: SocketAddr,
215}
216
217/// One container's summary for cluster-wide listing/aggregation, tagged with the
218/// node it runs on. Wire type shared by the agent (builds the local view), the
219/// scheduler's `Cluster` fan-out, and the API
220/// (`GET /internal/services/{svc}/state` + `list_containers`), so the leader can
221/// report replicas placed on remote nodes (distributed scaling).
222#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
223pub struct ClusterContainerSummary {
224    /// Raft id of the node this container runs on.
225    pub node_id: u64,
226    /// Full container id (`service-replica`).
227    pub id: String,
228    /// Service name.
229    pub service: String,
230    /// Replica index.
231    pub replica: u32,
232    /// Image reference.
233    pub image: String,
234    /// Lowercased lifecycle state (e.g. `"running"`).
235    pub state: String,
236    /// Process id, when running.
237    pub pid: Option<u32>,
238    /// Overlay IP, when assigned.
239    pub overlay_ip: Option<String>,
240}
241
242/// A single node's view of one service: how many replicas it runs **locally**,
243/// whether they're healthy there, and their containers. The leader aggregates
244/// one of these per node (its own local view + remote views fetched via the
245/// `Cluster` fan-out) to compute cluster-wide replica count, health, and the
246/// `ps` container listing for distributed services.
247#[derive(Debug, Clone, Serialize, Deserialize)]
248pub struct NodeServiceState {
249    /// Raft id of the reporting node.
250    pub node_id: u64,
251    /// Replicas of the service running on this node.
252    pub running: u32,
253    /// Whether this node's replicas of the service are healthy (trivially true
254    /// when the node runs none).
255    pub healthy: bool,
256    /// This node's containers for the service.
257    pub containers: Vec<ClusterContainerSummary>,
258}
259
260/// A static-cluster peer's identity, reachability, and labels.
261#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
262#[serde(deny_unknown_fields)]
263pub struct StaticPeer {
264    pub id: u64,
265    /// HTTP API address (host:port). Heartbeats land on this address
266    /// at `/health`; cluster dispatch lands at `/api/v1/internal/scale`.
267    pub api_addr: SocketAddr,
268    /// Operating system this peer runs (`"linux"` / `"windows"` / `"darwin"`).
269    /// Used by placement when filtering nodes for a service's `OsKind`.
270    #[serde(default = "default_os")]
271    pub os: String,
272    /// Free-form labels.
273    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
274    pub labels: HashMap<String, String>,
275}
276
277fn default_heartbeat_interval() -> Duration {
278    Duration::from_secs(5)
279}
280fn default_failure_threshold() -> Duration {
281    Duration::from_secs(15)
282}
283fn default_os() -> String {
284    "linux".to_string()
285}
286
287fn default_worker_grpc_addr() -> SocketAddr {
288    "0.0.0.0:3670"
289        .parse()
290        .expect("hardcoded SocketAddr literal")
291}
292
293fn is_default_worker_grpc_addr(addr: &SocketAddr) -> bool {
294    *addr == default_worker_grpc_addr()
295}
296
297fn default_min_ttl() -> Duration {
298    Duration::from_secs(10)
299}
300fn default_max_ttl() -> Duration {
301    Duration::from_secs(600)
302}
303fn default_grace() -> Duration {
304    Duration::from_secs(10)
305}
306fn default_max_hb() -> u32 {
307    50
308}
309fn default_failover_ttl() -> Duration {
310    Duration::from_secs(300)
311}
312
313/// `serde_with`-style serializer for `Duration` as integer seconds. Inline
314/// here to avoid adding a `serde_with` dep just for two fields.
315mod duration_secs {
316    use serde::{Deserialize, Deserializer, Serialize, Serializer};
317    use std::time::Duration;
318
319    pub fn serialize<S>(dur: &Duration, ser: S) -> Result<S::Ok, S::Error>
320    where
321        S: Serializer,
322    {
323        dur.as_secs().serialize(ser)
324    }
325
326    pub fn deserialize<'de, D>(de: D) -> Result<Duration, D::Error>
327    where
328        D: Deserializer<'de>,
329    {
330        let secs = u64::deserialize(de)?;
331        Ok(Duration::from_secs(secs))
332    }
333}
334
335// ============================================================================
336// Wire-level scale request shared across cluster impls.
337//
338// `InternalScaleRequest` and `ScaleAssignment` are the wire types fanned out
339// by the cluster leader to each node that gets at least one replica of a
340// service. They live here (instead of in `zlayer-scheduler::cluster`) so the
341// same Rust type can be shared between:
342//
343// - The HTTP fan-out path (`StaticCluster` / `RaftCluster` in
344//   `zlayer-scheduler`).
345// - The `/internal/scale` handler in `zlayer-api` (which deserializes the
346//   typed struct directly).
347// - A future gRPC `WorkerTierCluster` (Phase 3) that reuses the same shape.
348//
349// `zlayer-scheduler::cluster` re-exports both types so existing call sites
350// (`zlayer_scheduler::cluster::InternalScaleRequest::new`) keep compiling.
351// ============================================================================
352
353/// Wire-format scale request fanned out by the cluster's leader to each node
354/// that gets assigned at least one replica of a service.
355///
356/// The leader's placement engine produces a `HashMap<NodeId, Vec<(role, index)>>`
357/// of assignments; each entry becomes one `InternalScaleRequest` HTTP POST.
358///
359/// Backward-compatible: a peer may send `{service, replicas}` without
360/// `assignments`. Receiving nodes treat that as a single implicit
361/// `{role: "default", indices: 0..replicas}` group.
362#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
363pub struct InternalScaleRequest {
364    /// Service name.
365    pub service: String,
366    /// Total target replica count for this node, when caller didn't supply
367    /// explicit per-role assignments (legacy / Phase 1 shape). When
368    /// `assignments` is non-empty, this field is informational.
369    #[serde(default)]
370    pub replicas: u32,
371    /// Per-role-group container index lists. Empty in Phase 1; populated by
372    /// Phase 2 once `replica_groups` + cross-node identity ship.
373    #[serde(default, skip_serializing_if = "Vec::is_empty")]
374    pub assignments: Vec<ScaleAssignment>,
375    /// The full service spec, propagated so the receiving node can register
376    /// (or update) the service before scaling. This is what lets a fresh
377    /// worker run a replica it has never seen, and what makes an image change
378    /// on the leader reach worker containers: the receiver `upsert`s this spec,
379    /// which detects digest drift and rolls the local replicas. `None` on the
380    /// legacy `{service, replicas}` shape (receiver falls back to its cached
381    /// spec). Boxed because `ServiceSpec` is large.
382    #[serde(default, skip_serializing_if = "Option::is_none")]
383    #[schema(value_type = Option<Object>)]
384    pub spec: Option<Box<crate::spec::types::ServiceSpec>>,
385}
386
387/// One role-group entry within an [`InternalScaleRequest`]. Phase 2 ships
388/// this; Phase 1 uses the legacy `replicas` field only.
389#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
390pub struct ScaleAssignment {
391    /// Role name (e.g. `"default"`, `"primary"`, `"read"`).
392    pub role: String,
393    /// Replica indices within this role that the receiving node should ensure
394    /// exist locally. Each becomes a container `{service}-{role}-{index}`.
395    pub indices: Vec<u32>,
396}
397
398impl InternalScaleRequest {
399    /// Build a legacy `{service, replicas}` request (Phase 1).
400    #[must_use]
401    pub fn new(service: impl Into<String>, replicas: u32) -> Self {
402        Self {
403            service: service.into(),
404            replicas,
405            assignments: Vec::new(),
406            spec: None,
407        }
408    }
409
410    /// Attach the full service spec so the receiver can register/update the
411    /// service before scaling (image-change propagation + first-deploy on a
412    /// fresh worker). Chainable onto [`Self::new`] / [`Self::with_assignments`].
413    #[must_use]
414    pub fn with_spec(mut self, spec: crate::spec::types::ServiceSpec) -> Self {
415        self.spec = Some(Box::new(spec));
416        self
417    }
418
419    /// Build a Phase-2 request with explicit per-role assignments.
420    #[must_use]
421    pub fn with_assignments(service: impl Into<String>, assignments: Vec<ScaleAssignment>) -> Self {
422        // The `replicas` field is informational when assignments are present:
423        // it's still set to the total count for legacy receivers (who would
424        // ignore `assignments`) but the authoritative count is the sum of
425        // `assignments[i].indices.len()`.
426        let replicas: u32 = assignments
427            .iter()
428            .map(|a| u32::try_from(a.indices.len()).unwrap_or(u32::MAX))
429            .sum();
430        Self {
431            service: service.into(),
432            replicas,
433            assignments,
434            spec: None,
435        }
436    }
437}
438
439#[cfg(test)]
440mod tests {
441    use super::*;
442
443    #[test]
444    fn default_is_single_node() {
445        let cfg = ClusterMode::default();
446        assert_eq!(cfg, ClusterMode::SingleNode);
447    }
448
449    #[test]
450    fn scale_request_legacy_shape_has_no_spec() {
451        // A legacy `{service, replicas}` body (no `spec`) must still
452        // deserialize, with `spec` defaulting to None.
453        let req: InternalScaleRequest =
454            serde_json::from_str(r#"{"service":"web","replicas":3}"#).unwrap();
455        assert_eq!(req.service, "web");
456        assert_eq!(req.replicas, 3);
457        assert!(req.spec.is_none());
458        assert!(req.assignments.is_empty());
459    }
460
461    #[test]
462    fn scale_request_with_spec_roundtrips() {
463        let spec = crate::spec::types::ServiceSpec::default();
464        let req = InternalScaleRequest::new("web", 3).with_spec(spec);
465        assert!(req.spec.is_some());
466        let json = serde_json::to_string(&req).unwrap();
467        let back: InternalScaleRequest = serde_json::from_str(&json).unwrap();
468        assert_eq!(back.service, "web");
469        assert_eq!(back.replicas, 3);
470        assert!(back.spec.is_some(), "spec must survive the round-trip");
471    }
472
473    #[test]
474    fn yaml_static_roundtrip() {
475        let yaml = r"
476mode: static
477node_id: 2
478peers:
479  - id: 1
480    api_addr: 10.0.0.10:3669
481  - id: 2
482    api_addr: 10.0.0.11:3669
483heartbeat_interval: 5
484failure_threshold: 15
485";
486        let parsed: ClusterMode = serde_yaml::from_str(yaml).unwrap();
487        match parsed {
488            ClusterMode::Static {
489                node_id,
490                peers,
491                heartbeat_interval,
492                failure_threshold,
493            } => {
494                assert_eq!(node_id, 2);
495                assert_eq!(peers.len(), 2);
496                assert_eq!(heartbeat_interval, Duration::from_secs(5));
497                assert_eq!(failure_threshold, Duration::from_secs(15));
498            }
499            _ => panic!("expected Static variant"),
500        }
501    }
502
503    #[test]
504    fn yaml_single_node_roundtrip() {
505        let yaml = "mode: single-node";
506        let parsed: ClusterMode = serde_yaml::from_str(yaml).unwrap();
507        assert_eq!(parsed, ClusterMode::SingleNode);
508    }
509
510    // ------------------------------------------------------------------------
511    // InternalScaleRequest serde roundtrips.
512    //
513    // The wire shape is backward-compatible: legacy callers send
514    // `{service, replicas}` (no `assignments`); Phase-2 callers add
515    // `{service, replicas, assignments}`. Receiving nodes must parse both
516    // and the assignments-less form must still produce a useful struct.
517    // ------------------------------------------------------------------------
518
519    #[test]
520    fn internal_scale_request_legacy_shape() {
521        // Legacy `{service, replicas}` without `assignments`.
522        let json = r#"{"service":"web","replicas":3}"#;
523        let req: InternalScaleRequest = serde_json::from_str(json).unwrap();
524        assert_eq!(req.service, "web");
525        assert_eq!(req.replicas, 3);
526        assert!(req.assignments.is_empty());
527
528        // Re-serialize: `assignments` is omitted when empty.
529        let out = serde_json::to_string(&req).unwrap();
530        assert!(!out.contains("assignments"), "got: {out}");
531        assert!(out.contains(r#""service":"web""#));
532        assert!(out.contains(r#""replicas":3"#));
533    }
534
535    #[test]
536    fn internal_scale_request_with_assignments_roundtrip() {
537        let req = InternalScaleRequest::with_assignments(
538            "db",
539            vec![
540                ScaleAssignment {
541                    role: "primary".to_string(),
542                    indices: vec![0],
543                },
544                ScaleAssignment {
545                    role: "read".to_string(),
546                    indices: vec![1, 2],
547                },
548            ],
549        );
550        assert_eq!(req.replicas, 3); // sum of indices lengths
551
552        let json = serde_json::to_string(&req).unwrap();
553        let parsed: InternalScaleRequest = serde_json::from_str(&json).unwrap();
554        assert_eq!(parsed.service, "db");
555        assert_eq!(parsed.replicas, 3);
556        assert_eq!(parsed.assignments.len(), 2);
557        assert_eq!(parsed.assignments[0].role, "primary");
558        assert_eq!(parsed.assignments[0].indices, vec![0]);
559        assert_eq!(parsed.assignments[1].role, "read");
560        assert_eq!(parsed.assignments[1].indices, vec![1, 2]);
561    }
562
563    #[test]
564    fn internal_scale_request_new_constructs_legacy_shape() {
565        let req = InternalScaleRequest::new("api", 5);
566        assert_eq!(req.service, "api");
567        assert_eq!(req.replicas, 5);
568        assert!(req.assignments.is_empty());
569    }
570
571    #[test]
572    fn worker_tier_server_yaml_round_trips() {
573        let yaml = r"
574mode: worker-tier
575role: server
576node_id: 1
577peers:
578  - id: 1
579    raft_addr: 10.0.0.1:9001
580    api_addr: 10.0.0.1:3669
581  - id: 2
582    raft_addr: 10.0.0.2:9001
583    api_addr: 10.0.0.2:3669
584  - id: 3
585    raft_addr: 10.0.0.3:9001
586    api_addr: 10.0.0.3:3669
587worker_grpc_addr: 0.0.0.0:3670
588worker_ca_dir: /var/lib/zlayer/cluster
589heartbeat_min_ttl: 15
590heartbeat_max_ttl: 600
591heartbeat_grace: 10
592max_heartbeats_per_second: 100
593failover_heartbeat_ttl: 300
594";
595        let parsed: ClusterMode = serde_yaml::from_str(yaml).unwrap();
596        assert!(parsed.is_worker_tier_server());
597        assert!(!parsed.is_worker_tier_worker());
598        let ttl = parsed.adaptive_ttl_config().expect("ttl");
599        assert_eq!(ttl.max_heartbeats_per_second, 100);
600    }
601
602    #[test]
603    fn worker_tier_worker_yaml_round_trips() {
604        let yaml = r"
605mode: worker-tier
606role: worker
607servers:
608  - http://10.0.0.1:3670
609  - http://10.0.0.2:3670
610token_file: /etc/zlayer/worker.token
611identity_dir: /var/lib/zlayer/worker
612";
613        let parsed: ClusterMode = serde_yaml::from_str(yaml).unwrap();
614        assert!(parsed.is_worker_tier_worker());
615        assert!(!parsed.is_worker_tier_server());
616        // Tunable defaults applied:
617        let ttl = parsed.adaptive_ttl_config().expect("ttl");
618        assert_eq!(ttl.min_ttl_secs, 10);
619        assert_eq!(ttl.max_heartbeats_per_second, 50);
620    }
621}
622
623// ============================================================================
624// Worker tier (Phase 3) — Nomad-style worker protocol over HTTP
625// ============================================================================
626
627/// Bootstrap join request from a new worker node.
628///
629/// Carries a signed bootstrap token (issued by `zlayer node generate-worker-token`)
630/// along with the worker's profile so the leader can decide acceptance + assign
631/// an id. The leader rejects expired tokens, reused single-use tokens, or
632/// tokens from a different cluster.
633///
634/// Phase 3 MVP: token is a simple bearer string. Future Phase 3.1 will add CSR
635/// for mTLS identity.
636#[derive(Debug, Clone, Serialize, Deserialize)]
637pub struct WorkerRegisterRequest {
638    /// Bootstrap token issued by the control plane.
639    pub token: String,
640    /// Optional desired `node_id`. The leader may override (e.g. on conflict).
641    #[serde(default, skip_serializing_if = "Option::is_none")]
642    pub desired_node_id: Option<u64>,
643    /// This worker's profile (OS, arch, labels, resource caps).
644    pub profile: WorkerProfile,
645}
646
647/// Worker profile — published to the cluster directory on registration.
648#[derive(Debug, Clone, Serialize, Deserialize)]
649pub struct WorkerProfile {
650    /// Worker's externally-reachable HTTP API address (host:port).
651    pub api_addr: SocketAddr,
652    /// Operating system (`linux` / `windows` / `darwin`).
653    pub os: String,
654    /// CPU architecture (`x86_64` / `aarch64`).
655    pub arch: String,
656    /// Free-form labels (region, tier, hardware class, etc.).
657    #[serde(default, skip_serializing_if = "std::collections::HashMap::is_empty")]
658    pub labels: std::collections::HashMap<String, String>,
659    /// Total CPU cores available.
660    #[serde(default)]
661    pub cpu_total: u32,
662    /// Total memory in bytes.
663    #[serde(default)]
664    pub memory_total_bytes: u64,
665}
666
667/// Successful registration response.
668#[derive(Debug, Clone, Serialize, Deserialize)]
669pub struct WorkerRegisterResponse {
670    /// Assigned `node_id` (may differ from `desired_node_id`).
671    pub node_id: u64,
672    /// Cluster identifier — workers reject mismatched-cluster responses.
673    pub cluster_id: String,
674    /// Initial heartbeat TTL in seconds. Worker schedules its first
675    /// `ReportStatus` tick within this window.
676    pub heartbeat_ttl_secs: u32,
677    /// Grace period (seconds) after `heartbeat_ttl_secs` before the
678    /// leader marks the worker as `Unreachable`.
679    pub heartbeat_grace_secs: u32,
680    /// Internal token to present on subsequent worker requests
681    /// (`X-ZLayer-Internal-Token` header).
682    pub internal_token: String,
683}
684
685/// Long-poll request for new assignments. Server waits up to ~30s for
686/// a revision newer than `last_revision`, then returns the current
687/// assignment set (which may be empty).
688#[derive(Debug, Clone, Serialize, Deserialize)]
689pub struct WorkerPollRequest {
690    pub node_id: u64,
691    /// The highest revision the worker has applied. Server returns
692    /// any events with revision > this (or empty after timeout).
693    #[serde(default)]
694    pub last_revision: u64,
695    /// Maximum seconds to wait before returning even with no new
696    /// events. Defaults to 30; capped server-side at 60.
697    #[serde(default = "default_poll_wait_secs")]
698    pub max_wait_secs: u32,
699}
700
701fn default_poll_wait_secs() -> u32 {
702    30
703}
704
705/// Response to a long-poll: zero or more assignment events newer than
706/// the worker's `last_revision`.
707#[derive(Debug, Clone, Serialize, Deserialize)]
708pub struct WorkerPollResponse {
709    /// Current cluster revision (worker should record this).
710    pub revision: u64,
711    /// Assignment events ordered by revision ASC. Empty when nothing
712    /// new since `last_revision` (timeout).
713    #[serde(default, skip_serializing_if = "Vec::is_empty")]
714    pub events: Vec<WorkerAssignmentEvent>,
715}
716
717/// One change to a worker's assignment set.
718#[derive(Debug, Clone, Serialize, Deserialize)]
719#[serde(tag = "kind", rename_all = "snake_case")]
720pub enum WorkerAssignmentEvent {
721    /// Assign or update a service on this worker.
722    Set {
723        service: String,
724        /// Per-role replica indices the worker should own.
725        assignments: Vec<ScaleAssignment>,
726        revision: u64,
727    },
728    /// Remove a service entirely from this worker.
729    Delete { service: String, revision: u64 },
730    /// Drain command — worker should stop accepting new work and
731    /// shut down once existing containers exit.
732    Drain { revision: u64 },
733}
734
735/// Periodic status report from worker → control plane.
736#[derive(Debug, Clone, Serialize, Deserialize)]
737pub struct WorkerStatusReport {
738    pub node_id: u64,
739    /// Unix epoch nanoseconds when this snapshot was taken.
740    pub ts_ns: u64,
741    /// Currently-running container summaries.
742    #[serde(default)]
743    pub containers: Vec<WorkerContainerStatus>,
744    /// Resource utilization snapshot.
745    pub resources: WorkerResourceUsage,
746}
747
748#[derive(Debug, Clone, Serialize, Deserialize)]
749pub struct WorkerContainerStatus {
750    pub service: String,
751    pub role: String,
752    pub replica: u32,
753    /// `running` / `exited` / `failed` etc. — same convention as
754    /// the agent's `ContainerState` string form.
755    pub state: String,
756    /// Container's overlay IP, if attached.
757    #[serde(default, skip_serializing_if = "Option::is_none")]
758    pub overlay_ip: Option<std::net::IpAddr>,
759}
760
761#[derive(Debug, Clone, Serialize, Deserialize)]
762pub struct WorkerResourceUsage {
763    pub cpu_used: f64,
764    pub memory_used_bytes: u64,
765    pub gpu_used: u32,
766}
767
768/// Ack for a `WorkerStatusReport`. Carries the next-TTL so the
769/// worker can adapt its heartbeat cadence.
770#[derive(Debug, Clone, Serialize, Deserialize)]
771pub struct WorkerStatusAck {
772    /// Seconds until the worker's next required heartbeat. The leader
773    /// computes this adaptively from cluster size:
774    /// `clamp(N_workers / max_hb_per_sec, min, max)`.
775    pub next_ttl_secs: u32,
776}
777
778/// A worker lease in the leader's directory — drives liveness.
779#[derive(Debug, Clone, Serialize, Deserialize)]
780pub struct WorkerLease {
781    pub node_id: u64,
782    pub profile: WorkerProfile,
783    /// Unix epoch seconds when the lease was granted.
784    pub acquired_unix_secs: i64,
785    /// Unix epoch seconds of last successful heartbeat renewal.
786    pub renewed_unix_secs: i64,
787    /// Current TTL applied to this worker (adaptive).
788    pub ttl_secs: u32,
789    /// Grace period (seconds) applied on top of `ttl_secs` before the
790    /// leader marks this worker as `Unreachable`. Stored on the lease
791    /// so the value used at lease-grant time is durable even if the
792    /// leader's `AdaptiveTtlConfig` later changes.
793    pub grace_secs: u32,
794}
795
796impl WorkerLease {
797    /// True if `now_unix_secs - renewed_unix_secs > ttl_secs + grace`.
798    ///
799    /// The `grace_secs` argument lets callers override the lease's
800    /// stored grace value (e.g. to apply a temporarily-extended grace
801    /// during a known leadership transition).
802    #[must_use]
803    pub fn is_expired(&self, now_unix_secs: i64, grace_secs: u32) -> bool {
804        let elapsed = now_unix_secs.saturating_sub(self.renewed_unix_secs).max(0);
805        let elapsed_secs = u64::try_from(elapsed).unwrap_or(0);
806        elapsed_secs > u64::from(self.ttl_secs).saturating_add(u64::from(grace_secs))
807    }
808}
809
810/// Adaptive-TTL heartbeat configuration. Mirrors Nomad's design:
811/// the leader caps cluster-wide heartbeat rate to a constant by
812/// stretching individual workers' TTLs as the cluster grows.
813#[derive(Debug, Clone, Serialize, Deserialize)]
814pub struct AdaptiveTtlConfig {
815    pub min_ttl_secs: u32,
816    pub max_ttl_secs: u32,
817    pub grace_secs: u32,
818    pub max_heartbeats_per_second: u32,
819    pub failover_ttl_secs: u32,
820}
821
822impl Default for AdaptiveTtlConfig {
823    fn default() -> Self {
824        Self {
825            min_ttl_secs: 10,
826            max_ttl_secs: 600,
827            grace_secs: 10,
828            max_heartbeats_per_second: 50,
829            failover_ttl_secs: 300,
830        }
831    }
832}
833
834impl AdaptiveTtlConfig {
835    /// Compute the TTL to hand a worker, given the current cluster
836    /// size. Formula matches Nomad: `clamp(N / max_hb_per_sec, min, max)`.
837    #[must_use]
838    pub fn compute_ttl(&self, n_workers: u32) -> u32 {
839        if self.max_heartbeats_per_second == 0 {
840            return self.max_ttl_secs;
841        }
842        let raw = n_workers.saturating_add(self.max_heartbeats_per_second - 1)
843            / self.max_heartbeats_per_second;
844        raw.clamp(self.min_ttl_secs, self.max_ttl_secs)
845    }
846}
847
848#[cfg(test)]
849mod worker_tier_tests {
850    use super::*;
851
852    #[test]
853    fn adaptive_ttl_scales_with_cluster() {
854        let cfg = AdaptiveTtlConfig::default();
855        // 10 workers, 50hb/s target = ttl ~0.2s, clamped up to min=10s.
856        assert_eq!(cfg.compute_ttl(10), 10);
857        // 100 workers, 50hb/s target = ttl 2s, clamped up to 10s.
858        assert_eq!(cfg.compute_ttl(100), 10);
859        // 500 workers = 10s exactly.
860        assert_eq!(cfg.compute_ttl(500), 10);
861        // 1000 workers, 50hb/s = ttl 20s.
862        assert_eq!(cfg.compute_ttl(1000), 20);
863        // 10000 workers, 50hb/s = 200s (well within 600 cap).
864        assert_eq!(cfg.compute_ttl(10000), 200);
865        // 100000 workers = 2000s, clamped to 600s max.
866        assert_eq!(cfg.compute_ttl(100_000), 600);
867    }
868
869    #[test]
870    fn worker_lease_expiration() {
871        let lease = WorkerLease {
872            node_id: 1,
873            profile: WorkerProfile {
874                api_addr: "127.0.0.1:3669".parse().unwrap(),
875                os: "linux".to_string(),
876                arch: "x86_64".to_string(),
877                labels: HashMap::default(),
878                cpu_total: 4,
879                memory_total_bytes: 8_000_000_000,
880            },
881            acquired_unix_secs: 1000,
882            renewed_unix_secs: 1000,
883            ttl_secs: 30,
884            grace_secs: 10,
885        };
886        // 25s elapsed: not expired
887        assert!(!lease.is_expired(1025, 10));
888        // 40s elapsed: still within ttl+grace = 40
889        assert!(!lease.is_expired(1040, 10));
890        // 41s elapsed: expired
891        assert!(lease.is_expired(1041, 10));
892    }
893}