zlayer_types/cluster.rs
1//! Daemon-level cluster mode selection.
2//!
3//! Defines how a `ZLayer` daemon participates in (or doesn't) cluster
4//! membership. This is the top-level config the daemon reads at startup
5//! to decide which `Cluster` trait implementation to construct.
6//!
7//! For the wire-level join/membership DTOs see [`crate::api::cluster`].
8
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::net::SocketAddr;
12use std::time::Duration;
13
14/// How the daemon participates in (or doesn't) cluster membership.
15///
16/// The `WorkerTier` variant is intentionally large (~285 bytes) because it
17/// carries the full server-role + worker-role config inline. `ClusterMode`
18/// is parsed once at daemon startup and lives in an `Arc<DaemonConfig>` for
19/// the process lifetime, so the size delta is irrelevant in practice — and
20/// boxing it would force every caller of `is_worker_tier_server()` /
21/// `adaptive_ttl_config()` through an extra indirection for no win.
22#[allow(clippy::large_enum_variant)]
23#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
24#[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
25pub enum ClusterMode {
26 /// Single-node daemon. No peers, no consensus. `is_leader()` is always
27 /// true. Suitable for development and single-host deployments.
28 #[default]
29 #[serde(rename = "single-node")]
30 SingleNode,
31
32 /// openraft-backed consensus across the configured peers. This is the
33 /// existing production mode for multi-node deployments.
34 Raft {
35 /// This node's id (must be unique within the cluster).
36 node_id: u64,
37 /// Peer addresses (raft RPC ports). The daemon's own entry must
38 /// be present.
39 peers: Vec<RaftPeer>,
40 },
41
42 /// Static-membership cluster: config-driven peer list, deterministic
43 /// leader (lowest healthy node id), HTTP heartbeats for liveness.
44 /// No consensus log — concurrent writers may race; the per-service
45 /// scale semaphore in `ServiceManager` is the primary mitigation.
46 Static {
47 /// This node's id (must be unique within the cluster).
48 node_id: u64,
49 /// All cluster peers including self.
50 peers: Vec<StaticPeer>,
51 /// Heartbeat probe interval. Default 5s.
52 #[serde(default = "default_heartbeat_interval", with = "duration_secs")]
53 heartbeat_interval: Duration,
54 /// Time after which a peer with no heartbeat is `Unreachable`.
55 /// Default 15s (3x interval).
56 #[serde(default = "default_failure_threshold", with = "duration_secs")]
57 failure_threshold: Duration,
58 },
59
60 /// Nomad-style worker tier: 3–7 control-plane nodes run Raft consensus;
61 /// up to ~10,000 worker nodes join as gRPC clients with adaptive-TTL
62 /// heartbeats and never enter consensus.
63 ///
64 /// Workers are issued mTLS leaf certs by the cluster's worker CA during
65 /// `Register`. Heartbeat cadence scales with cluster size — every
66 /// `StatusAck` carries the next TTL computed from
67 /// `clamp(N_workers / max_heartbeats_per_second, min_ttl, max_ttl)`.
68 #[serde(rename = "worker-tier")]
69 WorkerTier {
70 /// What role THIS node plays.
71 role: WorkerTierRole,
72
73 /// Server-only: this node's id within the raft control plane.
74 /// Required when role == Server; ignored on workers (assigned by
75 /// leader during Register).
76 #[serde(default, skip_serializing_if = "Option::is_none")]
77 node_id: Option<u64>,
78
79 /// Server-only: the raft control-plane peer list (3-7 nodes).
80 /// Required when role == Server.
81 #[serde(default, skip_serializing_if = "Vec::is_empty")]
82 peers: Vec<RaftPeer>,
83
84 /// Server-only: address to bind the worker-facing gRPC server.
85 /// Default `0.0.0.0:3670` (the API server uses 3669; gRPC takes 3670).
86 #[serde(
87 default = "default_worker_grpc_addr",
88 skip_serializing_if = "is_default_worker_grpc_addr"
89 )]
90 worker_grpc_addr: SocketAddr,
91
92 /// Worker-only: control-plane gRPC endpoints to try (round-robin
93 /// fallback). Required when role == Worker.
94 #[serde(default, skip_serializing_if = "Vec::is_empty")]
95 servers: Vec<String>,
96
97 /// Worker-only: path to the bootstrap token file (single line,
98 /// URL-safe-base64 of a `WorkerBootstrapToken`).
99 #[serde(default, skip_serializing_if = "Option::is_none")]
100 token_file: Option<String>,
101
102 /// Worker-only: directory to persist mTLS identity (cert.pem, key.pem,
103 /// ca.pem). Defaults to `<data_dir>/worker/identity/` set by
104 /// `ZLayerDirs`.
105 #[serde(default, skip_serializing_if = "Option::is_none")]
106 identity_dir: Option<String>,
107
108 /// Server-only: worker CA storage directory. Defaults to
109 /// `<data_dir>/cluster/` (same as the existing cluster CA + signer).
110 #[serde(default, skip_serializing_if = "Option::is_none")]
111 worker_ca_dir: Option<String>,
112
113 /// Shared: minimum heartbeat TTL (default 10s).
114 #[serde(default = "default_min_ttl", with = "duration_secs")]
115 heartbeat_min_ttl: Duration,
116
117 /// Shared: maximum heartbeat TTL (default 10min).
118 #[serde(default = "default_max_ttl", with = "duration_secs")]
119 heartbeat_max_ttl: Duration,
120
121 /// Shared: grace period beyond TTL before a worker's lease is
122 /// considered expired. Default 10s.
123 #[serde(default = "default_grace", with = "duration_secs")]
124 heartbeat_grace: Duration,
125
126 /// Server-only: cluster-wide cap on heartbeats/second the leader is
127 /// willing to absorb. The leader hands every worker a TTL such that
128 /// total HB rate ≤ this. Default 50 (Nomad's default).
129 #[serde(default = "default_max_hb")]
130 max_heartbeats_per_second: u32,
131
132 /// Server-only: TTL applied immediately after a leader election so
133 /// workers don't all expire while the new leader is bootstrapping
134 /// its FSM. Default 5min.
135 #[serde(default = "default_failover_ttl", with = "duration_secs")]
136 failover_heartbeat_ttl: Duration,
137
138 /// Free-form node labels (placement, selector matching, etc.).
139 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
140 labels: HashMap<String, String>,
141 },
142}
143
144/// What role this node plays in a worker-tier cluster.
145#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
146#[serde(rename_all = "snake_case")]
147pub enum WorkerTierRole {
148 /// Participates in raft consensus + serves the worker gRPC.
149 Server,
150 /// gRPC client to the control plane; never enters consensus.
151 Worker,
152}
153
154impl ClusterMode {
155 /// Extract an `AdaptiveTtlConfig` from a `WorkerTier` variant. Returns
156 /// `None` for other modes.
157 #[must_use]
158 pub fn adaptive_ttl_config(&self) -> Option<AdaptiveTtlConfig> {
159 if let ClusterMode::WorkerTier {
160 heartbeat_min_ttl,
161 heartbeat_max_ttl,
162 heartbeat_grace,
163 max_heartbeats_per_second,
164 failover_heartbeat_ttl,
165 ..
166 } = self
167 {
168 Some(AdaptiveTtlConfig {
169 min_ttl_secs: u32::try_from(heartbeat_min_ttl.as_secs()).unwrap_or(u32::MAX),
170 max_ttl_secs: u32::try_from(heartbeat_max_ttl.as_secs()).unwrap_or(u32::MAX),
171 grace_secs: u32::try_from(heartbeat_grace.as_secs()).unwrap_or(u32::MAX),
172 max_heartbeats_per_second: *max_heartbeats_per_second,
173 failover_ttl_secs: u32::try_from(failover_heartbeat_ttl.as_secs())
174 .unwrap_or(u32::MAX),
175 })
176 } else {
177 None
178 }
179 }
180
181 /// Convenience: is this a worker-tier server-role config?
182 #[must_use]
183 pub fn is_worker_tier_server(&self) -> bool {
184 matches!(
185 self,
186 ClusterMode::WorkerTier {
187 role: WorkerTierRole::Server,
188 ..
189 }
190 )
191 }
192
193 /// Convenience: is this a worker-tier worker-role config?
194 #[must_use]
195 pub fn is_worker_tier_worker(&self) -> bool {
196 matches!(
197 self,
198 ClusterMode::WorkerTier {
199 role: WorkerTierRole::Worker,
200 ..
201 }
202 )
203 }
204}
205
206/// A raft peer's identity and reachability.
207#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
208#[serde(deny_unknown_fields)]
209pub struct RaftPeer {
210 pub id: u64,
211 /// Raft RPC address (host:port).
212 pub raft_addr: SocketAddr,
213 /// HTTP API address advertised to other cluster members.
214 pub api_addr: SocketAddr,
215}
216
217/// One container's summary for cluster-wide listing/aggregation, tagged with the
218/// node it runs on. Wire type shared by the agent (builds the local view), the
219/// scheduler's `Cluster` fan-out, and the API
220/// (`GET /internal/services/{svc}/state` + `list_containers`), so the leader can
221/// report replicas placed on remote nodes (distributed scaling).
222#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
223pub struct ClusterContainerSummary {
224 /// Raft id of the node this container runs on.
225 pub node_id: u64,
226 /// Full container id (`service-replica`).
227 pub id: String,
228 /// Service name.
229 pub service: String,
230 /// Replica index.
231 pub replica: u32,
232 /// Image reference.
233 pub image: String,
234 /// Lowercased lifecycle state (e.g. `"running"`).
235 pub state: String,
236 /// Process id, when running.
237 pub pid: Option<u32>,
238 /// Overlay IP, when assigned.
239 pub overlay_ip: Option<String>,
240}
241
242/// A single node's view of one service: how many replicas it runs **locally**,
243/// whether they're healthy there, and their containers. The leader aggregates
244/// one of these per node (its own local view + remote views fetched via the
245/// `Cluster` fan-out) to compute cluster-wide replica count, health, and the
246/// `ps` container listing for distributed services.
247#[derive(Debug, Clone, Serialize, Deserialize)]
248pub struct NodeServiceState {
249 /// Raft id of the reporting node.
250 pub node_id: u64,
251 /// Replicas of the service running on this node.
252 pub running: u32,
253 /// Whether this node's replicas of the service are healthy (trivially true
254 /// when the node runs none).
255 pub healthy: bool,
256 /// This node's containers for the service.
257 pub containers: Vec<ClusterContainerSummary>,
258}
259
260/// A static-cluster peer's identity, reachability, and labels.
261#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
262#[serde(deny_unknown_fields)]
263pub struct StaticPeer {
264 pub id: u64,
265 /// HTTP API address (host:port). Heartbeats land on this address
266 /// at `/health`; cluster dispatch lands at `/api/v1/internal/scale`.
267 pub api_addr: SocketAddr,
268 /// Operating system this peer runs (`"linux"` / `"windows"` / `"darwin"`).
269 /// Used by placement when filtering nodes for a service's `OsKind`.
270 #[serde(default = "default_os")]
271 pub os: String,
272 /// Free-form labels.
273 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
274 pub labels: HashMap<String, String>,
275}
276
277fn default_heartbeat_interval() -> Duration {
278 Duration::from_secs(5)
279}
280fn default_failure_threshold() -> Duration {
281 Duration::from_secs(15)
282}
283fn default_os() -> String {
284 "linux".to_string()
285}
286
287fn default_worker_grpc_addr() -> SocketAddr {
288 "0.0.0.0:3670"
289 .parse()
290 .expect("hardcoded SocketAddr literal")
291}
292
293fn is_default_worker_grpc_addr(addr: &SocketAddr) -> bool {
294 *addr == default_worker_grpc_addr()
295}
296
297fn default_min_ttl() -> Duration {
298 Duration::from_secs(10)
299}
300fn default_max_ttl() -> Duration {
301 Duration::from_secs(600)
302}
303fn default_grace() -> Duration {
304 Duration::from_secs(10)
305}
306fn default_max_hb() -> u32 {
307 50
308}
309fn default_failover_ttl() -> Duration {
310 Duration::from_secs(300)
311}
312
313/// `serde_with`-style serializer for `Duration` as integer seconds. Inline
314/// here to avoid adding a `serde_with` dep just for two fields.
315mod duration_secs {
316 use serde::{Deserialize, Deserializer, Serialize, Serializer};
317 use std::time::Duration;
318
319 pub fn serialize<S>(dur: &Duration, ser: S) -> Result<S::Ok, S::Error>
320 where
321 S: Serializer,
322 {
323 dur.as_secs().serialize(ser)
324 }
325
326 pub fn deserialize<'de, D>(de: D) -> Result<Duration, D::Error>
327 where
328 D: Deserializer<'de>,
329 {
330 let secs = u64::deserialize(de)?;
331 Ok(Duration::from_secs(secs))
332 }
333}
334
335// ============================================================================
336// Wire-level scale request shared across cluster impls.
337//
338// `InternalScaleRequest` and `ScaleAssignment` are the wire types fanned out
339// by the cluster leader to each node that gets at least one replica of a
340// service. They live here (instead of in `zlayer-scheduler::cluster`) so the
341// same Rust type can be shared between:
342//
343// - The HTTP fan-out path (`StaticCluster` / `RaftCluster` in
344// `zlayer-scheduler`).
345// - The `/internal/scale` handler in `zlayer-api` (which deserializes the
346// typed struct directly).
347// - A future gRPC `WorkerTierCluster` (Phase 3) that reuses the same shape.
348//
349// `zlayer-scheduler::cluster` re-exports both types so existing call sites
350// (`zlayer_scheduler::cluster::InternalScaleRequest::new`) keep compiling.
351// ============================================================================
352
353/// Wire-format scale request fanned out by the cluster's leader to each node
354/// that gets assigned at least one replica of a service.
355///
356/// The leader's placement engine produces a `HashMap<NodeId, Vec<(role, index)>>`
357/// of assignments; each entry becomes one `InternalScaleRequest` HTTP POST.
358///
359/// Backward-compatible: a peer may send `{service, replicas}` without
360/// `assignments`. Receiving nodes treat that as a single implicit
361/// `{role: "default", indices: 0..replicas}` group.
362#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
363pub struct InternalScaleRequest {
364 /// Service name.
365 pub service: String,
366 /// Total target replica count for this node, when caller didn't supply
367 /// explicit per-role assignments (legacy / Phase 1 shape). When
368 /// `assignments` is non-empty, this field is informational.
369 #[serde(default)]
370 pub replicas: u32,
371 /// Per-role-group container index lists. Empty in Phase 1; populated by
372 /// Phase 2 once `replica_groups` + cross-node identity ship.
373 #[serde(default, skip_serializing_if = "Vec::is_empty")]
374 pub assignments: Vec<ScaleAssignment>,
375 /// The full service spec, propagated so the receiving node can register
376 /// (or update) the service before scaling. This is what lets a fresh
377 /// worker run a replica it has never seen, and what makes an image change
378 /// on the leader reach worker containers: the receiver `upsert`s this spec,
379 /// which detects digest drift and rolls the local replicas. `None` on the
380 /// legacy `{service, replicas}` shape (receiver falls back to its cached
381 /// spec). Boxed because `ServiceSpec` is large.
382 #[serde(default, skip_serializing_if = "Option::is_none")]
383 #[schema(value_type = Option<Object>)]
384 pub spec: Option<Box<crate::spec::types::ServiceSpec>>,
385}
386
387/// One role-group entry within an [`InternalScaleRequest`]. Phase 2 ships
388/// this; Phase 1 uses the legacy `replicas` field only.
389#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
390pub struct ScaleAssignment {
391 /// Role name (e.g. `"default"`, `"primary"`, `"read"`).
392 pub role: String,
393 /// Replica indices within this role that the receiving node should ensure
394 /// exist locally. Each becomes a container `{service}-{role}-{index}`.
395 pub indices: Vec<u32>,
396}
397
398impl InternalScaleRequest {
399 /// Build a legacy `{service, replicas}` request (Phase 1).
400 #[must_use]
401 pub fn new(service: impl Into<String>, replicas: u32) -> Self {
402 Self {
403 service: service.into(),
404 replicas,
405 assignments: Vec::new(),
406 spec: None,
407 }
408 }
409
410 /// Attach the full service spec so the receiver can register/update the
411 /// service before scaling (image-change propagation + first-deploy on a
412 /// fresh worker). Chainable onto [`Self::new`] / [`Self::with_assignments`].
413 #[must_use]
414 pub fn with_spec(mut self, spec: crate::spec::types::ServiceSpec) -> Self {
415 self.spec = Some(Box::new(spec));
416 self
417 }
418
419 /// Build a Phase-2 request with explicit per-role assignments.
420 #[must_use]
421 pub fn with_assignments(service: impl Into<String>, assignments: Vec<ScaleAssignment>) -> Self {
422 // The `replicas` field is informational when assignments are present:
423 // it's still set to the total count for legacy receivers (who would
424 // ignore `assignments`) but the authoritative count is the sum of
425 // `assignments[i].indices.len()`.
426 let replicas: u32 = assignments
427 .iter()
428 .map(|a| u32::try_from(a.indices.len()).unwrap_or(u32::MAX))
429 .sum();
430 Self {
431 service: service.into(),
432 replicas,
433 assignments,
434 spec: None,
435 }
436 }
437}
438
439#[cfg(test)]
440mod tests {
441 use super::*;
442
443 #[test]
444 fn default_is_single_node() {
445 let cfg = ClusterMode::default();
446 assert_eq!(cfg, ClusterMode::SingleNode);
447 }
448
449 #[test]
450 fn scale_request_legacy_shape_has_no_spec() {
451 // A legacy `{service, replicas}` body (no `spec`) must still
452 // deserialize, with `spec` defaulting to None.
453 let req: InternalScaleRequest =
454 serde_json::from_str(r#"{"service":"web","replicas":3}"#).unwrap();
455 assert_eq!(req.service, "web");
456 assert_eq!(req.replicas, 3);
457 assert!(req.spec.is_none());
458 assert!(req.assignments.is_empty());
459 }
460
461 #[test]
462 fn scale_request_with_spec_roundtrips() {
463 let spec = crate::spec::types::ServiceSpec::default();
464 let req = InternalScaleRequest::new("web", 3).with_spec(spec);
465 assert!(req.spec.is_some());
466 let json = serde_json::to_string(&req).unwrap();
467 let back: InternalScaleRequest = serde_json::from_str(&json).unwrap();
468 assert_eq!(back.service, "web");
469 assert_eq!(back.replicas, 3);
470 assert!(back.spec.is_some(), "spec must survive the round-trip");
471 }
472
473 #[test]
474 fn yaml_static_roundtrip() {
475 let yaml = r"
476mode: static
477node_id: 2
478peers:
479 - id: 1
480 api_addr: 10.0.0.10:3669
481 - id: 2
482 api_addr: 10.0.0.11:3669
483heartbeat_interval: 5
484failure_threshold: 15
485";
486 let parsed: ClusterMode = serde_yaml::from_str(yaml).unwrap();
487 match parsed {
488 ClusterMode::Static {
489 node_id,
490 peers,
491 heartbeat_interval,
492 failure_threshold,
493 } => {
494 assert_eq!(node_id, 2);
495 assert_eq!(peers.len(), 2);
496 assert_eq!(heartbeat_interval, Duration::from_secs(5));
497 assert_eq!(failure_threshold, Duration::from_secs(15));
498 }
499 _ => panic!("expected Static variant"),
500 }
501 }
502
503 #[test]
504 fn yaml_single_node_roundtrip() {
505 let yaml = "mode: single-node";
506 let parsed: ClusterMode = serde_yaml::from_str(yaml).unwrap();
507 assert_eq!(parsed, ClusterMode::SingleNode);
508 }
509
510 // ------------------------------------------------------------------------
511 // InternalScaleRequest serde roundtrips.
512 //
513 // The wire shape is backward-compatible: legacy callers send
514 // `{service, replicas}` (no `assignments`); Phase-2 callers add
515 // `{service, replicas, assignments}`. Receiving nodes must parse both
516 // and the assignments-less form must still produce a useful struct.
517 // ------------------------------------------------------------------------
518
519 #[test]
520 fn internal_scale_request_legacy_shape() {
521 // Legacy `{service, replicas}` without `assignments`.
522 let json = r#"{"service":"web","replicas":3}"#;
523 let req: InternalScaleRequest = serde_json::from_str(json).unwrap();
524 assert_eq!(req.service, "web");
525 assert_eq!(req.replicas, 3);
526 assert!(req.assignments.is_empty());
527
528 // Re-serialize: `assignments` is omitted when empty.
529 let out = serde_json::to_string(&req).unwrap();
530 assert!(!out.contains("assignments"), "got: {out}");
531 assert!(out.contains(r#""service":"web""#));
532 assert!(out.contains(r#""replicas":3"#));
533 }
534
535 #[test]
536 fn internal_scale_request_with_assignments_roundtrip() {
537 let req = InternalScaleRequest::with_assignments(
538 "db",
539 vec![
540 ScaleAssignment {
541 role: "primary".to_string(),
542 indices: vec![0],
543 },
544 ScaleAssignment {
545 role: "read".to_string(),
546 indices: vec![1, 2],
547 },
548 ],
549 );
550 assert_eq!(req.replicas, 3); // sum of indices lengths
551
552 let json = serde_json::to_string(&req).unwrap();
553 let parsed: InternalScaleRequest = serde_json::from_str(&json).unwrap();
554 assert_eq!(parsed.service, "db");
555 assert_eq!(parsed.replicas, 3);
556 assert_eq!(parsed.assignments.len(), 2);
557 assert_eq!(parsed.assignments[0].role, "primary");
558 assert_eq!(parsed.assignments[0].indices, vec![0]);
559 assert_eq!(parsed.assignments[1].role, "read");
560 assert_eq!(parsed.assignments[1].indices, vec![1, 2]);
561 }
562
563 #[test]
564 fn internal_scale_request_new_constructs_legacy_shape() {
565 let req = InternalScaleRequest::new("api", 5);
566 assert_eq!(req.service, "api");
567 assert_eq!(req.replicas, 5);
568 assert!(req.assignments.is_empty());
569 }
570
571 #[test]
572 fn worker_tier_server_yaml_round_trips() {
573 let yaml = r"
574mode: worker-tier
575role: server
576node_id: 1
577peers:
578 - id: 1
579 raft_addr: 10.0.0.1:9001
580 api_addr: 10.0.0.1:3669
581 - id: 2
582 raft_addr: 10.0.0.2:9001
583 api_addr: 10.0.0.2:3669
584 - id: 3
585 raft_addr: 10.0.0.3:9001
586 api_addr: 10.0.0.3:3669
587worker_grpc_addr: 0.0.0.0:3670
588worker_ca_dir: /var/lib/zlayer/cluster
589heartbeat_min_ttl: 15
590heartbeat_max_ttl: 600
591heartbeat_grace: 10
592max_heartbeats_per_second: 100
593failover_heartbeat_ttl: 300
594";
595 let parsed: ClusterMode = serde_yaml::from_str(yaml).unwrap();
596 assert!(parsed.is_worker_tier_server());
597 assert!(!parsed.is_worker_tier_worker());
598 let ttl = parsed.adaptive_ttl_config().expect("ttl");
599 assert_eq!(ttl.max_heartbeats_per_second, 100);
600 }
601
602 #[test]
603 fn worker_tier_worker_yaml_round_trips() {
604 let yaml = r"
605mode: worker-tier
606role: worker
607servers:
608 - http://10.0.0.1:3670
609 - http://10.0.0.2:3670
610token_file: /etc/zlayer/worker.token
611identity_dir: /var/lib/zlayer/worker
612";
613 let parsed: ClusterMode = serde_yaml::from_str(yaml).unwrap();
614 assert!(parsed.is_worker_tier_worker());
615 assert!(!parsed.is_worker_tier_server());
616 // Tunable defaults applied:
617 let ttl = parsed.adaptive_ttl_config().expect("ttl");
618 assert_eq!(ttl.min_ttl_secs, 10);
619 assert_eq!(ttl.max_heartbeats_per_second, 50);
620 }
621}
622
623// ============================================================================
624// Worker tier (Phase 3) — Nomad-style worker protocol over HTTP
625// ============================================================================
626
627/// Bootstrap join request from a new worker node.
628///
629/// Carries a signed bootstrap token (issued by `zlayer node generate-worker-token`)
630/// along with the worker's profile so the leader can decide acceptance + assign
631/// an id. The leader rejects expired tokens, reused single-use tokens, or
632/// tokens from a different cluster.
633///
634/// Phase 3 MVP: token is a simple bearer string. Future Phase 3.1 will add CSR
635/// for mTLS identity.
636#[derive(Debug, Clone, Serialize, Deserialize)]
637pub struct WorkerRegisterRequest {
638 /// Bootstrap token issued by the control plane.
639 pub token: String,
640 /// Optional desired `node_id`. The leader may override (e.g. on conflict).
641 #[serde(default, skip_serializing_if = "Option::is_none")]
642 pub desired_node_id: Option<u64>,
643 /// This worker's profile (OS, arch, labels, resource caps).
644 pub profile: WorkerProfile,
645}
646
647/// Worker profile — published to the cluster directory on registration.
648#[derive(Debug, Clone, Serialize, Deserialize)]
649pub struct WorkerProfile {
650 /// Worker's externally-reachable HTTP API address (host:port).
651 pub api_addr: SocketAddr,
652 /// Operating system (`linux` / `windows` / `darwin`).
653 pub os: String,
654 /// CPU architecture (`x86_64` / `aarch64`).
655 pub arch: String,
656 /// Free-form labels (region, tier, hardware class, etc.).
657 #[serde(default, skip_serializing_if = "std::collections::HashMap::is_empty")]
658 pub labels: std::collections::HashMap<String, String>,
659 /// Total CPU cores available.
660 #[serde(default)]
661 pub cpu_total: u32,
662 /// Total memory in bytes.
663 #[serde(default)]
664 pub memory_total_bytes: u64,
665}
666
667/// Successful registration response.
668#[derive(Debug, Clone, Serialize, Deserialize)]
669pub struct WorkerRegisterResponse {
670 /// Assigned `node_id` (may differ from `desired_node_id`).
671 pub node_id: u64,
672 /// Cluster identifier — workers reject mismatched-cluster responses.
673 pub cluster_id: String,
674 /// Initial heartbeat TTL in seconds. Worker schedules its first
675 /// `ReportStatus` tick within this window.
676 pub heartbeat_ttl_secs: u32,
677 /// Grace period (seconds) after `heartbeat_ttl_secs` before the
678 /// leader marks the worker as `Unreachable`.
679 pub heartbeat_grace_secs: u32,
680 /// Internal token to present on subsequent worker requests
681 /// (`X-ZLayer-Internal-Token` header).
682 pub internal_token: String,
683}
684
685/// Long-poll request for new assignments. Server waits up to ~30s for
686/// a revision newer than `last_revision`, then returns the current
687/// assignment set (which may be empty).
688#[derive(Debug, Clone, Serialize, Deserialize)]
689pub struct WorkerPollRequest {
690 pub node_id: u64,
691 /// The highest revision the worker has applied. Server returns
692 /// any events with revision > this (or empty after timeout).
693 #[serde(default)]
694 pub last_revision: u64,
695 /// Maximum seconds to wait before returning even with no new
696 /// events. Defaults to 30; capped server-side at 60.
697 #[serde(default = "default_poll_wait_secs")]
698 pub max_wait_secs: u32,
699}
700
701fn default_poll_wait_secs() -> u32 {
702 30
703}
704
705/// Response to a long-poll: zero or more assignment events newer than
706/// the worker's `last_revision`.
707#[derive(Debug, Clone, Serialize, Deserialize)]
708pub struct WorkerPollResponse {
709 /// Current cluster revision (worker should record this).
710 pub revision: u64,
711 /// Assignment events ordered by revision ASC. Empty when nothing
712 /// new since `last_revision` (timeout).
713 #[serde(default, skip_serializing_if = "Vec::is_empty")]
714 pub events: Vec<WorkerAssignmentEvent>,
715}
716
717/// One change to a worker's assignment set.
718#[derive(Debug, Clone, Serialize, Deserialize)]
719#[serde(tag = "kind", rename_all = "snake_case")]
720pub enum WorkerAssignmentEvent {
721 /// Assign or update a service on this worker.
722 Set {
723 service: String,
724 /// Per-role replica indices the worker should own.
725 assignments: Vec<ScaleAssignment>,
726 revision: u64,
727 },
728 /// Remove a service entirely from this worker.
729 Delete { service: String, revision: u64 },
730 /// Drain command — worker should stop accepting new work and
731 /// shut down once existing containers exit.
732 Drain { revision: u64 },
733}
734
735/// Periodic status report from worker → control plane.
736#[derive(Debug, Clone, Serialize, Deserialize)]
737pub struct WorkerStatusReport {
738 pub node_id: u64,
739 /// Unix epoch nanoseconds when this snapshot was taken.
740 pub ts_ns: u64,
741 /// Currently-running container summaries.
742 #[serde(default)]
743 pub containers: Vec<WorkerContainerStatus>,
744 /// Resource utilization snapshot.
745 pub resources: WorkerResourceUsage,
746}
747
748#[derive(Debug, Clone, Serialize, Deserialize)]
749pub struct WorkerContainerStatus {
750 pub service: String,
751 pub role: String,
752 pub replica: u32,
753 /// `running` / `exited` / `failed` etc. — same convention as
754 /// the agent's `ContainerState` string form.
755 pub state: String,
756 /// Container's overlay IP, if attached.
757 #[serde(default, skip_serializing_if = "Option::is_none")]
758 pub overlay_ip: Option<std::net::IpAddr>,
759}
760
761#[derive(Debug, Clone, Serialize, Deserialize)]
762pub struct WorkerResourceUsage {
763 pub cpu_used: f64,
764 pub memory_used_bytes: u64,
765 pub gpu_used: u32,
766}
767
768/// Ack for a `WorkerStatusReport`. Carries the next-TTL so the
769/// worker can adapt its heartbeat cadence.
770#[derive(Debug, Clone, Serialize, Deserialize)]
771pub struct WorkerStatusAck {
772 /// Seconds until the worker's next required heartbeat. The leader
773 /// computes this adaptively from cluster size:
774 /// `clamp(N_workers / max_hb_per_sec, min, max)`.
775 pub next_ttl_secs: u32,
776}
777
778/// A worker lease in the leader's directory — drives liveness.
779#[derive(Debug, Clone, Serialize, Deserialize)]
780pub struct WorkerLease {
781 pub node_id: u64,
782 pub profile: WorkerProfile,
783 /// Unix epoch seconds when the lease was granted.
784 pub acquired_unix_secs: i64,
785 /// Unix epoch seconds of last successful heartbeat renewal.
786 pub renewed_unix_secs: i64,
787 /// Current TTL applied to this worker (adaptive).
788 pub ttl_secs: u32,
789 /// Grace period (seconds) applied on top of `ttl_secs` before the
790 /// leader marks this worker as `Unreachable`. Stored on the lease
791 /// so the value used at lease-grant time is durable even if the
792 /// leader's `AdaptiveTtlConfig` later changes.
793 pub grace_secs: u32,
794}
795
796impl WorkerLease {
797 /// True if `now_unix_secs - renewed_unix_secs > ttl_secs + grace`.
798 ///
799 /// The `grace_secs` argument lets callers override the lease's
800 /// stored grace value (e.g. to apply a temporarily-extended grace
801 /// during a known leadership transition).
802 #[must_use]
803 pub fn is_expired(&self, now_unix_secs: i64, grace_secs: u32) -> bool {
804 let elapsed = now_unix_secs.saturating_sub(self.renewed_unix_secs).max(0);
805 let elapsed_secs = u64::try_from(elapsed).unwrap_or(0);
806 elapsed_secs > u64::from(self.ttl_secs).saturating_add(u64::from(grace_secs))
807 }
808}
809
810/// Adaptive-TTL heartbeat configuration. Mirrors Nomad's design:
811/// the leader caps cluster-wide heartbeat rate to a constant by
812/// stretching individual workers' TTLs as the cluster grows.
813#[derive(Debug, Clone, Serialize, Deserialize)]
814pub struct AdaptiveTtlConfig {
815 pub min_ttl_secs: u32,
816 pub max_ttl_secs: u32,
817 pub grace_secs: u32,
818 pub max_heartbeats_per_second: u32,
819 pub failover_ttl_secs: u32,
820}
821
822impl Default for AdaptiveTtlConfig {
823 fn default() -> Self {
824 Self {
825 min_ttl_secs: 10,
826 max_ttl_secs: 600,
827 grace_secs: 10,
828 max_heartbeats_per_second: 50,
829 failover_ttl_secs: 300,
830 }
831 }
832}
833
834impl AdaptiveTtlConfig {
835 /// Compute the TTL to hand a worker, given the current cluster
836 /// size. Formula matches Nomad: `clamp(N / max_hb_per_sec, min, max)`.
837 #[must_use]
838 pub fn compute_ttl(&self, n_workers: u32) -> u32 {
839 if self.max_heartbeats_per_second == 0 {
840 return self.max_ttl_secs;
841 }
842 let raw = n_workers.saturating_add(self.max_heartbeats_per_second - 1)
843 / self.max_heartbeats_per_second;
844 raw.clamp(self.min_ttl_secs, self.max_ttl_secs)
845 }
846}
847
848#[cfg(test)]
849mod worker_tier_tests {
850 use super::*;
851
852 #[test]
853 fn adaptive_ttl_scales_with_cluster() {
854 let cfg = AdaptiveTtlConfig::default();
855 // 10 workers, 50hb/s target = ttl ~0.2s, clamped up to min=10s.
856 assert_eq!(cfg.compute_ttl(10), 10);
857 // 100 workers, 50hb/s target = ttl 2s, clamped up to 10s.
858 assert_eq!(cfg.compute_ttl(100), 10);
859 // 500 workers = 10s exactly.
860 assert_eq!(cfg.compute_ttl(500), 10);
861 // 1000 workers, 50hb/s = ttl 20s.
862 assert_eq!(cfg.compute_ttl(1000), 20);
863 // 10000 workers, 50hb/s = 200s (well within 600 cap).
864 assert_eq!(cfg.compute_ttl(10000), 200);
865 // 100000 workers = 2000s, clamped to 600s max.
866 assert_eq!(cfg.compute_ttl(100_000), 600);
867 }
868
869 #[test]
870 fn worker_lease_expiration() {
871 let lease = WorkerLease {
872 node_id: 1,
873 profile: WorkerProfile {
874 api_addr: "127.0.0.1:3669".parse().unwrap(),
875 os: "linux".to_string(),
876 arch: "x86_64".to_string(),
877 labels: HashMap::default(),
878 cpu_total: 4,
879 memory_total_bytes: 8_000_000_000,
880 },
881 acquired_unix_secs: 1000,
882 renewed_unix_secs: 1000,
883 ttl_secs: 30,
884 grace_secs: 10,
885 };
886 // 25s elapsed: not expired
887 assert!(!lease.is_expired(1025, 10));
888 // 40s elapsed: still within ttl+grace = 40
889 assert!(!lease.is_expired(1040, 10));
890 // 41s elapsed: expired
891 assert!(lease.is_expired(1041, 10));
892 }
893}