Skip to main content

modelexpress_server/
k8s_types.rs

1// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Kubernetes CRD types for ModelMetadata.
5//!
6//! These types define the ModelMetadata CustomResourceDefinition used as an
7//! alternative to Redis for storing P2P metadata.
8
9use kube::CustomResource;
10use schemars::JsonSchema;
11use serde::{Deserialize, Serialize};
12
13/// ModelMetadata spec - the desired state
14#[derive(CustomResource, Clone, Debug, Deserialize, Serialize, JsonSchema)]
15#[kube(
16    group = "modelexpress.nvidia.com",
17    version = "v1alpha1",
18    kind = "ModelMetadata",
19    plural = "modelmetadatas",
20    shortname = "mxmeta",
21    namespaced,
22    status = "ModelMetadataStatus"
23)]
24pub struct ModelMetadataSpec {
25    /// Full model name (e.g., deepseek-ai/DeepSeek-V3)
26    #[serde(rename = "modelName")]
27    pub model_name: String,
28}
29
30/// ModelMetadata status - the observed state
31#[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema)]
32pub struct ModelMetadataStatus {
33    /// Single worker NIXL metadata and readiness state (one CR per worker)
34    #[serde(default)]
35    pub worker: Option<WorkerStatus>,
36
37    /// Conditions for ModelMetadata lifecycle
38    #[serde(default)]
39    pub conditions: Vec<Condition>,
40
41    /// Generation observed by the controller
42    #[serde(rename = "observedGeneration", default)]
43    pub observed_generation: i64,
44
45    /// Timestamp when first worker published
46    #[serde(rename = "publishedAt", default)]
47    pub published_at: Option<String>,
48}
49
50/// Per-worker status
51#[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema)]
52pub struct WorkerStatus {
53    /// Worker rank (0-indexed)
54    #[serde(rename = "workerRank")]
55    pub worker_rank: i32,
56
57    /// Backend type discriminator ("nixl", "transfer_engine", "none")
58    #[serde(rename = "backendType", default)]
59    pub backend_type: Option<String>,
60
61    /// Base64-encoded NIXL agent metadata blob
62    #[serde(rename = "nixlMetadata", default)]
63    pub nixl_metadata: String,
64
65    /// Mooncake TransferEngine session ID
66    #[serde(rename = "transferEngineSessionId", default)]
67    pub transfer_engine_session_id: Option<String>,
68
69    /// Number of tensors registered by this worker
70    #[serde(rename = "tensorCount", default)]
71    pub tensor_count: i32,
72
73    /// Name of ConfigMap containing tensor descriptors
74    #[serde(rename = "tensorConfigMap", default)]
75    pub tensor_config_map: Option<String>,
76
77    /// Worker lifecycle status (Initializing, Ready, Stale)
78    #[serde(default)]
79    pub status: String,
80
81    /// Timestamp of last status update (RFC3339)
82    #[serde(rename = "updatedAt", default)]
83    pub updated_at: Option<String>,
84
85    /// P2P: NIXL listen thread endpoint (host:port)
86    #[serde(rename = "metadataEndpoint", default)]
87    pub metadata_endpoint: String,
88
89    /// P2P: NIXL agent name
90    #[serde(rename = "agentName", default)]
91    pub agent_name: String,
92
93    /// P2P: Worker gRPC endpoint for tensor manifest (host:port)
94    #[serde(rename = "workerGrpcEndpoint", default)]
95    pub worker_grpc_endpoint: String,
96}
97
98impl WorkerStatus {
99    /// Convert a `SourceStatus` proto enum value (i32) to the CRD status string.
100    pub fn status_name_from_proto(status: i32) -> String {
101        match status {
102            0 => "Unknown",
103            1 => "Initializing",
104            2 => "Ready",
105            3 => "Stale",
106            _ => "Unknown",
107        }
108        .to_string()
109    }
110
111    /// Convert a CRD status string back to the `SourceStatus` proto enum value (i32).
112    pub fn status_proto_from_name(name: &str) -> i32 {
113        match name {
114            "Initializing" => 1,
115            "Ready" => 2,
116            "Stale" => 3,
117            _ => 0,
118        }
119    }
120}
121
122/// Standard Kubernetes condition
123#[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema)]
124pub struct Condition {
125    /// Condition type
126    #[serde(rename = "type")]
127    pub type_: String,
128
129    /// Status: True, False, Unknown
130    pub status: String,
131
132    /// Machine-readable reason for condition
133    #[serde(default)]
134    pub reason: Option<String>,
135
136    /// Human-readable message
137    #[serde(default)]
138    pub message: Option<String>,
139
140    /// Timestamp of last transition
141    #[serde(rename = "lastTransitionTime", default)]
142    pub last_transition_time: Option<String>,
143}
144
145impl ModelMetadataStatus {
146    /// Insert or update a condition by type. If a condition with the same type
147    /// already exists, it is updated in place; `lastTransitionTime` is only
148    /// changed when `status` actually transitions.
149    pub fn set_condition(&mut self, type_: &str, status: &str, reason: &str, message: &str) {
150        let now = chrono::Utc::now().to_rfc3339();
151        if let Some(existing) = self.conditions.iter_mut().find(|c| c.type_ == type_) {
152            if existing.status != status {
153                existing.last_transition_time = Some(now);
154            }
155            existing.status = status.to_string();
156            existing.reason = Some(reason.to_string());
157            existing.message = Some(message.to_string());
158        } else {
159            self.conditions.push(Condition {
160                type_: type_.to_string(),
161                status: status.to_string(),
162                reason: Some(reason.to_string()),
163                message: Some(message.to_string()),
164                last_transition_time: Some(now),
165            });
166        }
167    }
168
169    /// Update the `Ready` condition based on the worker's proto status value.
170    /// Ready=True only when the worker status is `SOURCE_STATUS_READY` (2).
171    pub fn update_ready_condition(&mut self, worker_proto_status: i32) {
172        let is_ready = worker_proto_status == 2; // SOURCE_STATUS_READY
173        if is_ready {
174            self.set_condition("Ready", "True", "WorkerReady", "Worker is ready");
175        } else {
176            let status_name = WorkerStatus::status_name_from_proto(worker_proto_status);
177            self.set_condition(
178                "Ready",
179                "False",
180                &format!("Worker{}", status_name),
181                "Worker is not ready",
182            );
183        }
184    }
185}
186
187/// Tensor descriptor stored in ConfigMap
188#[derive(Clone, Debug, Deserialize, Serialize)]
189pub struct TensorDescriptorJson {
190    pub name: String,
191    /// Serialized as string to avoid precision loss
192    pub addr: String,
193    /// Serialized as string to avoid precision loss
194    pub size: String,
195    pub device_id: u32,
196    pub dtype: String,
197}
198
199/// Sanitize model name to be a valid Kubernetes resource name
200/// e.g., "deepseek-ai/DeepSeek-V3" -> "deepseek-ai-deepseek-v3"
201pub fn sanitize_model_name(model_name: &str) -> String {
202    model_name
203        .to_lowercase()
204        .replace(['/', '_'], "-")
205        .chars()
206        .filter(|c| c.is_ascii_alphanumeric() || *c == '-' || *c == '.')
207        .collect::<String>()
208        .trim_matches('-')
209        .to_string()
210}
211
212#[cfg(test)]
213#[allow(clippy::expect_used)]
214mod tests {
215    use super::*;
216
217    #[test]
218    fn test_status_roundtrip() {
219        for (proto, name) in [
220            (0, "Unknown"),
221            (1, "Initializing"),
222            (2, "Ready"),
223            (3, "Stale"),
224        ] {
225            assert_eq!(WorkerStatus::status_name_from_proto(proto), name);
226            assert_eq!(WorkerStatus::status_proto_from_name(name), proto);
227        }
228    }
229
230    /// Regression test: proto status 0 (SOURCE_STATUS_UNKNOWN) must survive a
231    /// write-to-CRD -> read-from-CRD roundtrip. Before the fix, status_proto_from_name
232    /// returned None for "Unknown", causing get_metadata to hard-error on any worker
233    /// that hadn't received an explicit UpdateStatus call after PublishMetadata.
234    #[test]
235    fn test_status_unknown_roundtrip() {
236        let written = WorkerStatus::status_name_from_proto(0);
237        assert_eq!(written, "Unknown");
238        let read_back = WorkerStatus::status_proto_from_name(&written);
239        assert_eq!(
240            read_back, 0,
241            "Unknown status must roundtrip to proto value 0"
242        );
243    }
244
245    #[test]
246    fn test_status_name_from_proto_unknown() {
247        assert_eq!(WorkerStatus::status_name_from_proto(99), "Unknown");
248        assert_eq!(WorkerStatus::status_name_from_proto(4), "Unknown");
249    }
250
251    #[test]
252    fn test_status_proto_from_name_unknown() {
253        assert_eq!(WorkerStatus::status_proto_from_name("Unknown"), 0);
254        assert_eq!(WorkerStatus::status_proto_from_name(""), 0);
255        assert_eq!(WorkerStatus::status_proto_from_name("ready"), 0);
256    }
257
258    #[test]
259    fn test_sanitize_model_name() {
260        assert_eq!(
261            sanitize_model_name("deepseek-ai/DeepSeek-V3"),
262            "deepseek-ai-deepseek-v3"
263        );
264        assert_eq!(
265            sanitize_model_name("meta-llama/Llama-3.1-70B"),
266            "meta-llama-llama-3.1-70b"
267        );
268        assert_eq!(sanitize_model_name("simple-model"), "simple-model");
269    }
270
271    #[test]
272    fn test_sanitize_model_name_special_chars() {
273        assert_eq!(sanitize_model_name("Llama@3.1+8B"), "llama3.18b");
274        assert_eq!(sanitize_model_name("model with spaces"), "modelwithspaces");
275        assert_eq!(
276            sanitize_model_name("org_name/model_v2"),
277            "org-name-model-v2"
278        );
279    }
280
281    #[test]
282    fn test_sanitize_model_name_edge_cases() {
283        assert_eq!(sanitize_model_name(""), "");
284        assert_eq!(sanitize_model_name("///"), "");
285        assert_eq!(sanitize_model_name("---"), "");
286        assert_eq!(sanitize_model_name("-model-"), "model");
287    }
288
289    #[test]
290    fn test_tensor_descriptor_json_roundtrip() {
291        let original = TensorDescriptorJson {
292            name: "model.layers.0.weight".to_string(),
293            addr: "139948187451390".to_string(),
294            size: "134217728".to_string(),
295            device_id: 0,
296            dtype: "bfloat16".to_string(),
297        };
298
299        let json = serde_json::to_string(&original).expect("serialize");
300        let parsed: TensorDescriptorJson = serde_json::from_str(&json).expect("deserialize");
301
302        assert_eq!(parsed.name, original.name);
303        assert_eq!(parsed.addr, original.addr);
304        assert_eq!(parsed.size, original.size);
305        assert_eq!(parsed.device_id, original.device_id);
306        assert_eq!(parsed.dtype, original.dtype);
307
308        let addr: u64 = parsed.addr.parse().expect("addr should parse as u64");
309        assert_eq!(addr, 139948187451390);
310        let size: u64 = parsed.size.parse().expect("size should parse as u64");
311        assert_eq!(size, 134217728);
312    }
313
314    #[test]
315    fn test_tensor_descriptor_json_large_values() {
316        let desc = TensorDescriptorJson {
317            name: "test".to_string(),
318            addr: u64::MAX.to_string(),
319            size: u64::MAX.to_string(),
320            device_id: 7,
321            dtype: "float16".to_string(),
322        };
323
324        let json = serde_json::to_string(&desc).expect("serialize");
325        let parsed: TensorDescriptorJson = serde_json::from_str(&json).expect("deserialize");
326
327        let addr: u64 = parsed.addr.parse().expect("max u64 addr should parse");
328        assert_eq!(addr, u64::MAX);
329    }
330
331    #[test]
332    fn test_set_condition_inserts_new() {
333        let mut status = ModelMetadataStatus::default();
334        assert!(status.conditions.is_empty());
335
336        status.set_condition("Ready", "True", "WorkerPublished", "Published");
337
338        assert_eq!(status.conditions.len(), 1);
339        let cond = &status.conditions[0];
340        assert_eq!(cond.type_, "Ready");
341        assert_eq!(cond.status, "True");
342        assert_eq!(cond.reason.as_deref(), Some("WorkerPublished"));
343        assert_eq!(cond.message.as_deref(), Some("Published"));
344        assert!(cond.last_transition_time.is_some());
345    }
346
347    #[test]
348    fn test_set_condition_updates_existing() {
349        let mut status = ModelMetadataStatus::default();
350        status.set_condition("Ready", "True", "WorkerPublished", "Published");
351        let original_time = status.conditions[0].last_transition_time.clone();
352
353        status.set_condition("Ready", "False", "WorkerStale", "Worker is stale");
354
355        assert_eq!(status.conditions.len(), 1);
356        let cond = &status.conditions[0];
357        assert_eq!(cond.status, "False");
358        assert_eq!(cond.reason.as_deref(), Some("WorkerStale"));
359        assert_ne!(
360            cond.last_transition_time, original_time,
361            "lastTransitionTime must change on status transition"
362        );
363    }
364
365    #[test]
366    fn test_set_condition_same_status_preserves_transition_time() {
367        let mut status = ModelMetadataStatus::default();
368        status.set_condition("Ready", "True", "WorkerPublished", "Published");
369        let original_time = status.conditions[0].last_transition_time.clone();
370
371        status.set_condition("Ready", "True", "StillReady", "Still ready");
372
373        assert_eq!(status.conditions.len(), 1);
374        assert_eq!(status.conditions[0].reason.as_deref(), Some("StillReady"));
375        assert_eq!(
376            status.conditions[0].last_transition_time, original_time,
377            "lastTransitionTime must not change when status stays the same"
378        );
379    }
380
381    #[test]
382    fn test_update_ready_condition_ready() {
383        let mut status = ModelMetadataStatus::default();
384        status.update_ready_condition(2); // SOURCE_STATUS_READY
385
386        assert_eq!(status.conditions.len(), 1);
387        let cond = &status.conditions[0];
388        assert_eq!(cond.type_, "Ready");
389        assert_eq!(cond.status, "True");
390        assert_eq!(cond.reason.as_deref(), Some("WorkerReady"));
391    }
392
393    #[test]
394    fn test_update_ready_condition_not_ready_states() {
395        for (proto, expected_reason) in [
396            (0, "WorkerUnknown"),
397            (1, "WorkerInitializing"),
398            (3, "WorkerStale"),
399        ] {
400            let mut status = ModelMetadataStatus::default();
401            status.update_ready_condition(proto);
402
403            assert_eq!(status.conditions.len(), 1);
404            let cond = &status.conditions[0];
405            assert_eq!(cond.type_, "Ready");
406            assert_eq!(cond.status, "False");
407            assert_eq!(
408                cond.reason.as_deref(),
409                Some(expected_reason),
410                "proto status {} should produce reason {}",
411                proto,
412                expected_reason
413            );
414        }
415    }
416
417    #[test]
418    fn test_update_ready_condition_transition() {
419        let mut status = ModelMetadataStatus::default();
420
421        status.update_ready_condition(1); // Initializing
422        assert_eq!(status.conditions[0].status, "False");
423        let time_false = status.conditions[0].last_transition_time.clone();
424
425        status.update_ready_condition(2); // Ready
426        assert_eq!(status.conditions[0].status, "True");
427        assert_ne!(
428            status.conditions[0].last_transition_time, time_false,
429            "lastTransitionTime must change on False->True transition"
430        );
431
432        status.update_ready_condition(3); // Stale
433        assert_eq!(status.conditions[0].status, "False");
434        assert_eq!(status.conditions[0].reason.as_deref(), Some("WorkerStale"));
435    }
436}