modelexpress_server/registry/backend.rs
1// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Backend abstraction for the distributed model registry.
5//!
6//! Parallels [`crate::p2p::backend`] in shape: trait, per-store submodules, factory.
7
8use crate::backend_config::BackendConfig;
9use async_trait::async_trait;
10use chrono::{DateTime, Utc};
11use modelexpress_common::models::{ModelProvider, ModelStatus};
12use std::sync::Arc;
13
14pub mod kubernetes;
15pub mod redis;
16
17/// Result type for registry operations. Errors are boxed so backend-specific error types
18/// (Redis, kube) can bubble up without the trait needing to know about them.
19pub type RegistryResult<T> = Result<T, Box<dyn std::error::Error + Send + Sync>>;
20
21/// Full model-lifecycle record returned by registry backends.
22#[derive(Debug, Clone, PartialEq)]
23pub struct ModelRecord {
24 pub model_name: String,
25 pub provider: ModelProvider,
26 pub status: ModelStatus,
27 pub created_at: DateTime<Utc>,
28 pub last_used_at: DateTime<Utc>,
29 pub message: Option<String>,
30}
31
32/// Result of a `try_claim_for_download` call. Distributed semantics across replicas:
33/// only the caller that sees `Claimed` owns the download. Replicas that see
34/// `AlreadyExists` must wait on the owner instead of spawning a duplicate download.
35#[derive(Debug, Clone, Copy, PartialEq)]
36pub enum ClaimOutcome {
37 /// This call atomically created the registry record with status `DOWNLOADING`.
38 /// The caller is the download owner.
39 Claimed,
40 /// The record already existed when we tried to claim. The caller is a waiter, not
41 /// the owner; the enclosed status is the snapshot observed during the attempt.
42 AlreadyExists(ModelStatus),
43}
44
45/// Trait for model-registry backend implementations.
46#[cfg_attr(test, mockall::automock)]
47#[async_trait]
48pub trait RegistryBackend: Send + Sync {
49 /// Initialize the connection. Idempotent; safe to call at startup.
50 async fn connect(&self) -> RegistryResult<()>;
51
52 /// Return the status of a model, or `None` if the model is unknown.
53 async fn get_status(&self, model_name: &str) -> RegistryResult<Option<ModelStatus>>;
54
55 /// Return the full record for a model, or `None` if unknown.
56 async fn get_model_record(&self, model_name: &str) -> RegistryResult<Option<ModelRecord>>;
57
58 /// Upsert the status, provider, message, and `last_used_at` for a model. Preserves
59 /// `created_at` when the record already exists; stamps it to now otherwise.
60 async fn set_status(
61 &self,
62 model_name: &str,
63 provider: ModelProvider,
64 status: ModelStatus,
65 message: Option<String>,
66 ) -> RegistryResult<()>;
67
68 /// Bump `last_used_at` to now. No-op if the model is unknown.
69 async fn touch_model(&self, model_name: &str) -> RegistryResult<()>;
70
71 /// Delete the model record. No-op if the model is unknown.
72 async fn delete_model(&self, model_name: &str) -> RegistryResult<()>;
73
74 /// Return models ordered by `last_used_at` ascending (oldest first), truncated to
75 /// `limit` if provided. Drives LRU cache eviction.
76 async fn get_models_by_last_used(&self, limit: Option<u32>)
77 -> RegistryResult<Vec<ModelRecord>>;
78
79 /// Return (downloading, downloaded, error) counts. Used by the metrics path.
80 async fn get_status_counts(&self) -> RegistryResult<(u32, u32, u32)>;
81
82 /// Atomic claim: if the model has no registry record, create one with status
83 /// `DOWNLOADING` and return `ClaimOutcome::Claimed`. Otherwise, return
84 /// `ClaimOutcome::AlreadyExists(status)` without mutation.
85 ///
86 /// This is the only way multi-replica servers know which one actually owns the
87 /// download. Callers MUST NOT infer ownership from the observed status alone —
88 /// two replicas can both observe `DOWNLOADING` but only the one that receives
89 /// `Claimed` owns it.
90 async fn try_claim_for_download(
91 &self,
92 model_name: &str,
93 provider: ModelProvider,
94 ) -> RegistryResult<ClaimOutcome>;
95
96 /// Atomic compare-and-set: if the current status is `ERROR`, flip it to
97 /// `DOWNLOADING` with `Retrying download...` as the message and return `true`.
98 /// Otherwise return `false` without mutation. Used by the error-retry path so
99 /// only one replica spawns the retry even when multiple observe `ERROR`.
100 async fn try_reset_error_for_retry(
101 &self,
102 model_name: &str,
103 provider: ModelProvider,
104 ) -> RegistryResult<bool>;
105}
106
107/// Construct a registry backend from config, eagerly connecting before returning.
108pub async fn create_registry_backend(
109 config: BackendConfig,
110) -> RegistryResult<Arc<dyn RegistryBackend>> {
111 match config {
112 BackendConfig::Redis { url } => {
113 let backend = redis::RedisRegistryBackend::new(&url);
114 backend.connect().await?;
115 Ok(Arc::new(backend) as Arc<dyn RegistryBackend>)
116 }
117 BackendConfig::Kubernetes { namespace } => {
118 let backend = kubernetes::KubernetesRegistryBackend::new(&namespace).await?;
119 backend.connect().await?;
120 Ok(Arc::new(backend) as Arc<dyn RegistryBackend>)
121 }
122 }
123}