Skip to main content

modelexpress_server/registry/
backend.rs

1// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Backend abstraction for the distributed model registry.
5//!
6//! Parallels [`crate::p2p::backend`] in shape: trait, per-store submodules, factory.
7
8use crate::backend_config::BackendConfig;
9use async_trait::async_trait;
10use chrono::{DateTime, Utc};
11use modelexpress_common::models::{ModelProvider, ModelStatus};
12use std::sync::Arc;
13
14pub mod kubernetes;
15pub mod redis;
16
17/// Result type for registry operations. Errors are boxed so backend-specific error types
18/// (Redis, kube) can bubble up without the trait needing to know about them.
19pub type RegistryResult<T> = Result<T, Box<dyn std::error::Error + Send + Sync>>;
20
21/// Full model-lifecycle record returned by registry backends.
22#[derive(Debug, Clone, PartialEq)]
23pub struct ModelRecord {
24    pub model_name: String,
25    pub provider: ModelProvider,
26    pub status: ModelStatus,
27    pub created_at: DateTime<Utc>,
28    pub last_used_at: DateTime<Utc>,
29    pub message: Option<String>,
30}
31
32/// Result of a `try_claim_for_download` call. Distributed semantics across replicas:
33/// only the caller that sees `Claimed` owns the download. Replicas that see
34/// `AlreadyExists` must wait on the owner instead of spawning a duplicate download.
35#[derive(Debug, Clone, Copy, PartialEq)]
36pub enum ClaimOutcome {
37    /// This call atomically created the registry record with status `DOWNLOADING`.
38    /// The caller is the download owner.
39    Claimed,
40    /// The record already existed when we tried to claim. The caller is a waiter, not
41    /// the owner; the enclosed status is the snapshot observed during the attempt.
42    AlreadyExists(ModelStatus),
43}
44
45/// Trait for model-registry backend implementations.
46#[cfg_attr(test, mockall::automock)]
47#[async_trait]
48pub trait RegistryBackend: Send + Sync {
49    /// Initialize the connection. Idempotent; safe to call at startup.
50    async fn connect(&self) -> RegistryResult<()>;
51
52    /// Return the status of a model, or `None` if the model is unknown.
53    async fn get_status(&self, model_name: &str) -> RegistryResult<Option<ModelStatus>>;
54
55    /// Return the full record for a model, or `None` if unknown.
56    async fn get_model_record(&self, model_name: &str) -> RegistryResult<Option<ModelRecord>>;
57
58    /// Upsert the status, provider, message, and `last_used_at` for a model. Preserves
59    /// `created_at` when the record already exists; stamps it to now otherwise.
60    async fn set_status(
61        &self,
62        model_name: &str,
63        provider: ModelProvider,
64        status: ModelStatus,
65        message: Option<String>,
66    ) -> RegistryResult<()>;
67
68    /// Bump `last_used_at` to now. No-op if the model is unknown.
69    async fn touch_model(&self, model_name: &str) -> RegistryResult<()>;
70
71    /// Delete the model record. No-op if the model is unknown.
72    async fn delete_model(&self, model_name: &str) -> RegistryResult<()>;
73
74    /// Return models ordered by `last_used_at` ascending (oldest first), truncated to
75    /// `limit` if provided. Drives LRU cache eviction.
76    async fn get_models_by_last_used(&self, limit: Option<u32>)
77    -> RegistryResult<Vec<ModelRecord>>;
78
79    /// Return (downloading, downloaded, error) counts. Used by the metrics path.
80    async fn get_status_counts(&self) -> RegistryResult<(u32, u32, u32)>;
81
82    /// Atomic claim: if the model has no registry record, create one with status
83    /// `DOWNLOADING` and return `ClaimOutcome::Claimed`. Otherwise, return
84    /// `ClaimOutcome::AlreadyExists(status)` without mutation.
85    ///
86    /// This is the only way multi-replica servers know which one actually owns the
87    /// download. Callers MUST NOT infer ownership from the observed status alone —
88    /// two replicas can both observe `DOWNLOADING` but only the one that receives
89    /// `Claimed` owns it.
90    async fn try_claim_for_download(
91        &self,
92        model_name: &str,
93        provider: ModelProvider,
94    ) -> RegistryResult<ClaimOutcome>;
95
96    /// Atomic compare-and-set: if the current status is `ERROR`, flip it to
97    /// `DOWNLOADING` with `Retrying download...` as the message and return `true`.
98    /// Otherwise return `false` without mutation. Used by the error-retry path so
99    /// only one replica spawns the retry even when multiple observe `ERROR`.
100    async fn try_reset_error_for_retry(
101        &self,
102        model_name: &str,
103        provider: ModelProvider,
104    ) -> RegistryResult<bool>;
105}
106
107/// Construct a registry backend from config, eagerly connecting before returning.
108pub async fn create_registry_backend(
109    config: BackendConfig,
110) -> RegistryResult<Arc<dyn RegistryBackend>> {
111    match config {
112        BackendConfig::Redis { url } => {
113            let backend = redis::RedisRegistryBackend::new(&url);
114            backend.connect().await?;
115            Ok(Arc::new(backend) as Arc<dyn RegistryBackend>)
116        }
117        BackendConfig::Kubernetes { namespace } => {
118            let backend = kubernetes::KubernetesRegistryBackend::new(&namespace).await?;
119            backend.connect().await?;
120            Ok(Arc::new(backend) as Arc<dyn RegistryBackend>)
121        }
122    }
123}