1use kube::CustomResource;
10use schemars::JsonSchema;
11use serde::{Deserialize, Serialize};
12
13#[derive(CustomResource, Clone, Debug, Deserialize, Serialize, JsonSchema)]
15#[kube(
16 group = "modelexpress.nvidia.com",
17 version = "v1alpha1",
18 kind = "ModelMetadata",
19 plural = "modelmetadatas",
20 shortname = "mxmeta",
21 namespaced,
22 status = "ModelMetadataStatus"
23)]
24pub struct ModelMetadataSpec {
25 #[serde(rename = "modelName")]
27 pub model_name: String,
28}
29
30#[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema)]
32pub struct ModelMetadataStatus {
33 #[serde(default)]
35 pub worker: Option<WorkerStatus>,
36
37 #[serde(default)]
39 pub conditions: Vec<Condition>,
40
41 #[serde(rename = "observedGeneration", default)]
43 pub observed_generation: i64,
44
45 #[serde(rename = "publishedAt", default)]
47 pub published_at: Option<String>,
48}
49
50#[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema)]
52pub struct WorkerStatus {
53 #[serde(rename = "workerRank")]
55 pub worker_rank: i32,
56
57 #[serde(rename = "backendType", default)]
59 pub backend_type: Option<String>,
60
61 #[serde(rename = "nixlMetadata", default)]
63 pub nixl_metadata: String,
64
65 #[serde(rename = "transferEngineSessionId", default)]
67 pub transfer_engine_session_id: Option<String>,
68
69 #[serde(rename = "tensorCount", default)]
71 pub tensor_count: i32,
72
73 #[serde(rename = "tensorConfigMap", default)]
75 pub tensor_config_map: Option<String>,
76
77 #[serde(default)]
79 pub status: String,
80
81 #[serde(rename = "updatedAt", default)]
83 pub updated_at: Option<String>,
84
85 #[serde(rename = "metadataEndpoint", default)]
87 pub metadata_endpoint: String,
88
89 #[serde(rename = "agentName", default)]
91 pub agent_name: String,
92
93 #[serde(rename = "workerGrpcEndpoint", default)]
95 pub worker_grpc_endpoint: String,
96}
97
98impl WorkerStatus {
99 pub fn status_name_from_proto(status: i32) -> String {
101 match status {
102 0 => "Unknown",
103 1 => "Initializing",
104 2 => "Ready",
105 3 => "Stale",
106 _ => "Unknown",
107 }
108 .to_string()
109 }
110
111 pub fn status_proto_from_name(name: &str) -> i32 {
113 match name {
114 "Initializing" => 1,
115 "Ready" => 2,
116 "Stale" => 3,
117 _ => 0,
118 }
119 }
120}
121
122#[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema)]
124pub struct Condition {
125 #[serde(rename = "type")]
127 pub type_: String,
128
129 pub status: String,
131
132 #[serde(default)]
134 pub reason: Option<String>,
135
136 #[serde(default)]
138 pub message: Option<String>,
139
140 #[serde(rename = "lastTransitionTime", default)]
142 pub last_transition_time: Option<String>,
143}
144
145impl ModelMetadataStatus {
146 pub fn set_condition(&mut self, type_: &str, status: &str, reason: &str, message: &str) {
150 let now = chrono::Utc::now().to_rfc3339();
151 if let Some(existing) = self.conditions.iter_mut().find(|c| c.type_ == type_) {
152 if existing.status != status {
153 existing.last_transition_time = Some(now);
154 }
155 existing.status = status.to_string();
156 existing.reason = Some(reason.to_string());
157 existing.message = Some(message.to_string());
158 } else {
159 self.conditions.push(Condition {
160 type_: type_.to_string(),
161 status: status.to_string(),
162 reason: Some(reason.to_string()),
163 message: Some(message.to_string()),
164 last_transition_time: Some(now),
165 });
166 }
167 }
168
169 pub fn update_ready_condition(&mut self, worker_proto_status: i32) {
172 let is_ready = worker_proto_status == 2; if is_ready {
174 self.set_condition("Ready", "True", "WorkerReady", "Worker is ready");
175 } else {
176 let status_name = WorkerStatus::status_name_from_proto(worker_proto_status);
177 self.set_condition(
178 "Ready",
179 "False",
180 &format!("Worker{}", status_name),
181 "Worker is not ready",
182 );
183 }
184 }
185}
186
187#[derive(Clone, Debug, Deserialize, Serialize)]
189pub struct TensorDescriptorJson {
190 pub name: String,
191 pub addr: String,
193 pub size: String,
195 pub device_id: u32,
196 pub dtype: String,
197}
198
199pub fn sanitize_model_name(model_name: &str) -> String {
202 model_name
203 .to_lowercase()
204 .replace(['/', '_'], "-")
205 .chars()
206 .filter(|c| c.is_ascii_alphanumeric() || *c == '-' || *c == '.')
207 .collect::<String>()
208 .trim_matches('-')
209 .to_string()
210}
211
212#[cfg(test)]
213#[allow(clippy::expect_used)]
214mod tests {
215 use super::*;
216
217 #[test]
218 fn test_status_roundtrip() {
219 for (proto, name) in [
220 (0, "Unknown"),
221 (1, "Initializing"),
222 (2, "Ready"),
223 (3, "Stale"),
224 ] {
225 assert_eq!(WorkerStatus::status_name_from_proto(proto), name);
226 assert_eq!(WorkerStatus::status_proto_from_name(name), proto);
227 }
228 }
229
230 #[test]
235 fn test_status_unknown_roundtrip() {
236 let written = WorkerStatus::status_name_from_proto(0);
237 assert_eq!(written, "Unknown");
238 let read_back = WorkerStatus::status_proto_from_name(&written);
239 assert_eq!(
240 read_back, 0,
241 "Unknown status must roundtrip to proto value 0"
242 );
243 }
244
245 #[test]
246 fn test_status_name_from_proto_unknown() {
247 assert_eq!(WorkerStatus::status_name_from_proto(99), "Unknown");
248 assert_eq!(WorkerStatus::status_name_from_proto(4), "Unknown");
249 }
250
251 #[test]
252 fn test_status_proto_from_name_unknown() {
253 assert_eq!(WorkerStatus::status_proto_from_name("Unknown"), 0);
254 assert_eq!(WorkerStatus::status_proto_from_name(""), 0);
255 assert_eq!(WorkerStatus::status_proto_from_name("ready"), 0);
256 }
257
258 #[test]
259 fn test_sanitize_model_name() {
260 assert_eq!(
261 sanitize_model_name("deepseek-ai/DeepSeek-V3"),
262 "deepseek-ai-deepseek-v3"
263 );
264 assert_eq!(
265 sanitize_model_name("meta-llama/Llama-3.1-70B"),
266 "meta-llama-llama-3.1-70b"
267 );
268 assert_eq!(sanitize_model_name("simple-model"), "simple-model");
269 }
270
271 #[test]
272 fn test_sanitize_model_name_special_chars() {
273 assert_eq!(sanitize_model_name("Llama@3.1+8B"), "llama3.18b");
274 assert_eq!(sanitize_model_name("model with spaces"), "modelwithspaces");
275 assert_eq!(
276 sanitize_model_name("org_name/model_v2"),
277 "org-name-model-v2"
278 );
279 }
280
281 #[test]
282 fn test_sanitize_model_name_edge_cases() {
283 assert_eq!(sanitize_model_name(""), "");
284 assert_eq!(sanitize_model_name("///"), "");
285 assert_eq!(sanitize_model_name("---"), "");
286 assert_eq!(sanitize_model_name("-model-"), "model");
287 }
288
289 #[test]
290 fn test_tensor_descriptor_json_roundtrip() {
291 let original = TensorDescriptorJson {
292 name: "model.layers.0.weight".to_string(),
293 addr: "139948187451390".to_string(),
294 size: "134217728".to_string(),
295 device_id: 0,
296 dtype: "bfloat16".to_string(),
297 };
298
299 let json = serde_json::to_string(&original).expect("serialize");
300 let parsed: TensorDescriptorJson = serde_json::from_str(&json).expect("deserialize");
301
302 assert_eq!(parsed.name, original.name);
303 assert_eq!(parsed.addr, original.addr);
304 assert_eq!(parsed.size, original.size);
305 assert_eq!(parsed.device_id, original.device_id);
306 assert_eq!(parsed.dtype, original.dtype);
307
308 let addr: u64 = parsed.addr.parse().expect("addr should parse as u64");
309 assert_eq!(addr, 139948187451390);
310 let size: u64 = parsed.size.parse().expect("size should parse as u64");
311 assert_eq!(size, 134217728);
312 }
313
314 #[test]
315 fn test_tensor_descriptor_json_large_values() {
316 let desc = TensorDescriptorJson {
317 name: "test".to_string(),
318 addr: u64::MAX.to_string(),
319 size: u64::MAX.to_string(),
320 device_id: 7,
321 dtype: "float16".to_string(),
322 };
323
324 let json = serde_json::to_string(&desc).expect("serialize");
325 let parsed: TensorDescriptorJson = serde_json::from_str(&json).expect("deserialize");
326
327 let addr: u64 = parsed.addr.parse().expect("max u64 addr should parse");
328 assert_eq!(addr, u64::MAX);
329 }
330
331 #[test]
332 fn test_set_condition_inserts_new() {
333 let mut status = ModelMetadataStatus::default();
334 assert!(status.conditions.is_empty());
335
336 status.set_condition("Ready", "True", "WorkerPublished", "Published");
337
338 assert_eq!(status.conditions.len(), 1);
339 let cond = &status.conditions[0];
340 assert_eq!(cond.type_, "Ready");
341 assert_eq!(cond.status, "True");
342 assert_eq!(cond.reason.as_deref(), Some("WorkerPublished"));
343 assert_eq!(cond.message.as_deref(), Some("Published"));
344 assert!(cond.last_transition_time.is_some());
345 }
346
347 #[test]
348 fn test_set_condition_updates_existing() {
349 let mut status = ModelMetadataStatus::default();
350 status.set_condition("Ready", "True", "WorkerPublished", "Published");
351 let original_time = status.conditions[0].last_transition_time.clone();
352
353 status.set_condition("Ready", "False", "WorkerStale", "Worker is stale");
354
355 assert_eq!(status.conditions.len(), 1);
356 let cond = &status.conditions[0];
357 assert_eq!(cond.status, "False");
358 assert_eq!(cond.reason.as_deref(), Some("WorkerStale"));
359 assert_ne!(
360 cond.last_transition_time, original_time,
361 "lastTransitionTime must change on status transition"
362 );
363 }
364
365 #[test]
366 fn test_set_condition_same_status_preserves_transition_time() {
367 let mut status = ModelMetadataStatus::default();
368 status.set_condition("Ready", "True", "WorkerPublished", "Published");
369 let original_time = status.conditions[0].last_transition_time.clone();
370
371 status.set_condition("Ready", "True", "StillReady", "Still ready");
372
373 assert_eq!(status.conditions.len(), 1);
374 assert_eq!(status.conditions[0].reason.as_deref(), Some("StillReady"));
375 assert_eq!(
376 status.conditions[0].last_transition_time, original_time,
377 "lastTransitionTime must not change when status stays the same"
378 );
379 }
380
381 #[test]
382 fn test_update_ready_condition_ready() {
383 let mut status = ModelMetadataStatus::default();
384 status.update_ready_condition(2); assert_eq!(status.conditions.len(), 1);
387 let cond = &status.conditions[0];
388 assert_eq!(cond.type_, "Ready");
389 assert_eq!(cond.status, "True");
390 assert_eq!(cond.reason.as_deref(), Some("WorkerReady"));
391 }
392
393 #[test]
394 fn test_update_ready_condition_not_ready_states() {
395 for (proto, expected_reason) in [
396 (0, "WorkerUnknown"),
397 (1, "WorkerInitializing"),
398 (3, "WorkerStale"),
399 ] {
400 let mut status = ModelMetadataStatus::default();
401 status.update_ready_condition(proto);
402
403 assert_eq!(status.conditions.len(), 1);
404 let cond = &status.conditions[0];
405 assert_eq!(cond.type_, "Ready");
406 assert_eq!(cond.status, "False");
407 assert_eq!(
408 cond.reason.as_deref(),
409 Some(expected_reason),
410 "proto status {} should produce reason {}",
411 proto,
412 expected_reason
413 );
414 }
415 }
416
417 #[test]
418 fn test_update_ready_condition_transition() {
419 let mut status = ModelMetadataStatus::default();
420
421 status.update_ready_condition(1); assert_eq!(status.conditions[0].status, "False");
423 let time_false = status.conditions[0].last_transition_time.clone();
424
425 status.update_ready_condition(2); assert_eq!(status.conditions[0].status, "True");
427 assert_ne!(
428 status.conditions[0].last_transition_time, time_false,
429 "lastTransitionTime must change on False->True transition"
430 );
431
432 status.update_ready_condition(3); assert_eq!(status.conditions[0].status, "False");
434 assert_eq!(status.conditions[0].reason.as_deref(), Some("WorkerStale"));
435 }
436}