1use async_trait::async_trait;
13use modelexpress_common::grpc::p2p::{SourceIdentity, SourceStatus, WorkerMetadata};
14use std::sync::Arc;
15
16pub mod kubernetes;
17pub mod redis;
18
19pub type MetadataResult<T> = Result<T, Box<dyn std::error::Error + Send + Sync>>;
21
22#[derive(Debug, Clone)]
24pub struct ModelMetadataRecord {
25 pub source_id: String,
27 pub worker_id: String,
29 pub model_name: String,
31 pub workers: Vec<WorkerRecord>,
32 pub published_at: i64,
33}
34
35#[derive(Debug, Clone)]
38pub struct SourceInstanceInfo {
39 pub source_id: String,
40 pub worker_id: String,
41 pub model_name: String,
42 pub worker_rank: u32,
44 pub status: i32,
46 pub updated_at: i64,
48}
49
50#[derive(Debug, Clone, PartialEq)]
52pub enum BackendMetadataRecord {
53 Nixl(Vec<u8>),
55 TransferEngine(String),
57 None,
59}
60
61impl BackendMetadataRecord {
62 pub fn from_flat(
68 nixl_metadata: Vec<u8>,
69 transfer_engine_session_id: Option<String>,
70 backend_type: Option<&str>,
71 ) -> Self {
72 match backend_type {
73 Some("transfer_engine") => {
74 let sid = transfer_engine_session_id.unwrap_or_default();
75 Self::TransferEngine(sid)
76 }
77 Some("nixl") => Self::Nixl(nixl_metadata),
78 Some("none") => Self::None,
79 _ => {
81 if let Some(sid) = transfer_engine_session_id
82 && !sid.is_empty()
83 {
84 return Self::TransferEngine(sid);
85 }
86 if !nixl_metadata.is_empty() {
87 return Self::Nixl(nixl_metadata);
88 }
89 Self::None
90 }
91 }
92 }
93
94 pub fn backend_type_str(&self) -> &'static str {
96 match self {
97 Self::Nixl(_) => "nixl",
98 Self::TransferEngine(_) => "transfer_engine",
99 Self::None => "none",
100 }
101 }
102}
103
104#[derive(Debug, Clone)]
106pub struct WorkerRecord {
107 pub worker_rank: u32,
108 pub backend_metadata: BackendMetadataRecord,
109 pub tensors: Vec<TensorRecord>,
110 pub status: i32,
112 pub updated_at: i64,
114 pub metadata_endpoint: String,
116 pub agent_name: String,
118 pub worker_grpc_endpoint: String,
120}
121
122#[derive(Debug, Clone)]
124pub struct TensorRecord {
125 pub name: String,
126 pub addr: u64,
127 pub size: u64,
128 pub device_id: u32,
129 pub dtype: String,
130}
131
132impl From<WorkerMetadata> for WorkerRecord {
134 fn from(meta: WorkerMetadata) -> Self {
135 use modelexpress_common::grpc::p2p::worker_metadata::BackendMetadata;
136 let backend_metadata = match meta.backend_metadata {
137 Some(BackendMetadata::NixlMetadata(data)) => BackendMetadataRecord::Nixl(data),
138 Some(BackendMetadata::TransferEngineSessionId(sid)) => {
139 BackendMetadataRecord::TransferEngine(sid)
140 }
141 None => BackendMetadataRecord::None,
142 };
143 Self {
144 worker_rank: meta.worker_rank,
145 backend_metadata,
146 tensors: meta.tensors.into_iter().map(TensorRecord::from).collect(),
147 status: meta.status,
148 updated_at: meta.updated_at,
149 metadata_endpoint: meta.metadata_endpoint,
150 agent_name: meta.agent_name,
151 worker_grpc_endpoint: meta.worker_grpc_endpoint,
152 }
153 }
154}
155
156impl From<modelexpress_common::grpc::p2p::TensorDescriptor> for TensorRecord {
157 fn from(desc: modelexpress_common::grpc::p2p::TensorDescriptor) -> Self {
158 Self {
159 name: desc.name,
160 addr: desc.addr,
161 size: desc.size,
162 device_id: desc.device_id,
163 dtype: desc.dtype,
164 }
165 }
166}
167
168impl From<WorkerRecord> for WorkerMetadata {
170 fn from(record: WorkerRecord) -> Self {
171 use modelexpress_common::grpc::p2p::worker_metadata::BackendMetadata;
172 let backend_metadata = match record.backend_metadata {
173 BackendMetadataRecord::Nixl(data) => Some(BackendMetadata::NixlMetadata(data)),
174 BackendMetadataRecord::TransferEngine(sid) => {
175 Some(BackendMetadata::TransferEngineSessionId(sid))
176 }
177 BackendMetadataRecord::None => None,
178 };
179 Self {
180 worker_rank: record.worker_rank,
181 backend_metadata,
182 tensors: record
183 .tensors
184 .into_iter()
185 .map(modelexpress_common::grpc::p2p::TensorDescriptor::from)
186 .collect(),
187 status: record.status,
188 updated_at: record.updated_at,
189 metadata_endpoint: record.metadata_endpoint,
190 agent_name: record.agent_name,
191 worker_grpc_endpoint: record.worker_grpc_endpoint,
192 }
193 }
194}
195
196impl From<TensorRecord> for modelexpress_common::grpc::p2p::TensorDescriptor {
197 fn from(record: TensorRecord) -> Self {
198 Self {
199 name: record.name,
200 addr: record.addr,
201 size: record.size,
202 device_id: record.device_id,
203 dtype: record.dtype,
204 }
205 }
206}
207
208#[cfg_attr(test, mockall::automock)]
210#[async_trait]
211pub trait MetadataBackend: Send + Sync {
212 async fn connect(&self) -> MetadataResult<()>;
214
215 async fn publish_metadata(
219 &self,
220 identity: &SourceIdentity,
221 worker_id: &str,
222 worker: WorkerMetadata,
223 ) -> MetadataResult<()>;
224
225 async fn get_metadata(
228 &self,
229 source_id: &str,
230 worker_id: &str,
231 ) -> MetadataResult<Option<ModelMetadataRecord>>;
232
233 async fn list_workers(
237 &self,
238 source_id: Option<String>,
239 status_filter: Option<SourceStatus>,
240 ) -> MetadataResult<Vec<SourceInstanceInfo>>;
241
242 async fn remove_metadata(&self, source_id: &str) -> MetadataResult<()>;
244
245 async fn remove_worker(&self, source_id: &str, worker_id: &str) -> MetadataResult<()>;
248
249 async fn list_sources(&self) -> MetadataResult<Vec<(String, String)>>;
251
252 async fn update_status(
254 &self,
255 source_id: &str,
256 worker_id: &str,
257 worker_rank: u32,
258 status: SourceStatus,
259 updated_at: i64,
260 ) -> MetadataResult<()>;
261}
262
263pub use crate::backend_config::BackendConfig;
264
265pub async fn create_backend(config: BackendConfig) -> MetadataResult<Arc<dyn MetadataBackend>> {
267 match config {
268 BackendConfig::Redis { url } => {
269 let backend = redis::RedisBackend::new(&url);
270 backend.connect().await?;
271 Ok(Arc::new(backend) as Arc<dyn MetadataBackend>)
272 }
273 BackendConfig::Kubernetes { namespace } => {
274 let backend = kubernetes::KubernetesBackend::new(&namespace).await?;
275 backend.connect().await?;
276 Ok(Arc::new(backend) as Arc<dyn MetadataBackend>)
277 }
278 }
279}