mpl_registry_api/
handlers.rs

1//! Registry API handlers
2
3use axum::{
4    extract::{Path, Query, State},
5    Json,
6};
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use std::sync::Arc;
10use tokio::fs;
11
12use crate::error::RegistryError;
13use crate::state::RegistryState;
14
15/// Health check response
16#[derive(Serialize)]
17pub struct HealthResponse {
18    pub status: String,
19    pub service: String,
20    pub version: String,
21}
22
23/// Health check handler
24pub async fn health() -> Json<HealthResponse> {
25    Json(HealthResponse {
26        status: "healthy".to_string(),
27        service: "mpl-registry-api".to_string(),
28        version: env!("CARGO_PKG_VERSION").to_string(),
29    })
30}
31
32/// SType metadata response
33#[derive(Clone, Serialize)]
34pub struct StypeMetadata {
35    pub stype: String,
36    pub namespace: String,
37    pub domain: String,
38    pub name: String,
39    pub version: u32,
40    pub schema_url: String,
41    pub urn: String,
42}
43
44/// Get schema for an SType
45pub async fn get_schema(
46    State(state): State<Arc<RegistryState>>,
47    Path((namespace, domain, name, version)): Path<(String, String, String, String)>,
48) -> Result<Json<Value>, RegistryError> {
49    // Parse version
50    let version_num = version
51        .strip_prefix('v')
52        .and_then(|v| v.parse::<u32>().ok())
53        .ok_or_else(|| RegistryError::InvalidFormat(format!("Invalid version: {}", version)))?;
54
55    let stype_id = format!("{}.{}.{}.v{}", namespace, domain, name, version_num);
56
57    // Check cache first
58    if let Some(schema) = state.cache.get(&stype_id) {
59        return Ok(Json(schema));
60    }
61
62    // Load from filesystem
63    let schema_path = state.schema_path(&namespace, &domain, &name, version_num);
64
65    if !schema_path.exists() {
66        return Err(RegistryError::NotFound(stype_id));
67    }
68
69    let content = fs::read_to_string(&schema_path).await?;
70    let schema: Value = serde_json::from_str(&content)
71        .map_err(|e| RegistryError::SchemaError(e.to_string()))?;
72
73    // Cache the schema
74    state.cache.insert(stype_id, schema.clone());
75
76    Ok(Json(schema))
77}
78
79/// Get SType metadata
80pub async fn get_stype_metadata(
81    State(state): State<Arc<RegistryState>>,
82    Path((namespace, domain, name, version)): Path<(String, String, String, String)>,
83) -> Result<Json<StypeMetadata>, RegistryError> {
84    let version_num = version
85        .strip_prefix('v')
86        .and_then(|v| v.parse::<u32>().ok())
87        .ok_or_else(|| RegistryError::InvalidFormat(format!("Invalid version: {}", version)))?;
88
89    let stype_path = state.stype_path(&namespace, &domain, &name, version_num);
90
91    if !stype_path.exists() {
92        return Err(RegistryError::NotFound(format!(
93            "{}.{}.{}.v{}",
94            namespace, domain, name, version_num
95        )));
96    }
97
98    let stype_id = format!("{}.{}.{}.v{}", namespace, domain, name, version_num);
99
100    Ok(Json(StypeMetadata {
101        stype: stype_id.clone(),
102        namespace: namespace.clone(),
103        domain: domain.clone(),
104        name: name.clone(),
105        version: version_num,
106        schema_url: format!("/stypes/{}/{}/{}/v{}/schema", namespace, domain, name, version_num),
107        urn: format!("urn:stype:{}", stype_id),
108    }))
109}
110
111/// List query parameters
112#[derive(Deserialize)]
113pub struct ListQuery {
114    pub namespace: Option<String>,
115    pub domain: Option<String>,
116    pub limit: Option<usize>,
117    pub offset: Option<usize>,
118}
119
120/// List STypes
121#[derive(Serialize)]
122pub struct ListResponse {
123    pub stypes: Vec<StypeMetadata>,
124    pub total: usize,
125    pub limit: usize,
126    pub offset: usize,
127}
128
129/// List all STypes
130pub async fn list_stypes(
131    State(state): State<Arc<RegistryState>>,
132    Query(query): Query<ListQuery>,
133) -> Result<Json<ListResponse>, RegistryError> {
134    let limit = query.limit.unwrap_or(50).min(100);
135    let offset = query.offset.unwrap_or(0);
136
137    let stypes_dir = state.registry_path.join("stypes");
138    let mut stypes = Vec::new();
139
140    // Walk the registry directory
141    if let Ok(mut entries) = fs::read_dir(&stypes_dir).await {
142        while let Ok(Some(ns_entry)) = entries.next_entry().await {
143            let ns_name = ns_entry.file_name().to_string_lossy().to_string();
144
145            // Filter by namespace if specified
146            if let Some(ref filter_ns) = query.namespace {
147                if !ns_name.starts_with(filter_ns) {
148                    continue;
149                }
150            }
151
152            if let Ok(mut domain_entries) = fs::read_dir(ns_entry.path()).await {
153                while let Ok(Some(domain_entry)) = domain_entries.next_entry().await {
154                    let domain_name = domain_entry.file_name().to_string_lossy().to_string();
155
156                    // Filter by domain if specified
157                    if let Some(ref filter_domain) = query.domain {
158                        if !domain_name.starts_with(filter_domain) {
159                            continue;
160                        }
161                    }
162
163                    if let Ok(mut name_entries) = fs::read_dir(domain_entry.path()).await {
164                        while let Ok(Some(name_entry)) = name_entries.next_entry().await {
165                            let type_name = name_entry.file_name().to_string_lossy().to_string();
166
167                            if let Ok(mut version_entries) = fs::read_dir(name_entry.path()).await {
168                                while let Ok(Some(version_entry)) = version_entries.next_entry().await {
169                                    let version_str = version_entry.file_name().to_string_lossy().to_string();
170
171                                    if let Some(version_num) = version_str
172                                        .strip_prefix('v')
173                                        .and_then(|v| v.parse::<u32>().ok())
174                                    {
175                                        let stype_id = format!(
176                                            "{}.{}.{}.v{}",
177                                            ns_name, domain_name, type_name, version_num
178                                        );
179
180                                        stypes.push(StypeMetadata {
181                                            stype: stype_id.clone(),
182                                            namespace: ns_name.clone(),
183                                            domain: domain_name.clone(),
184                                            name: type_name.clone(),
185                                            version: version_num,
186                                            schema_url: format!(
187                                                "/stypes/{}/{}/{}/v{}/schema",
188                                                ns_name, domain_name, type_name, version_num
189                                            ),
190                                            urn: format!("urn:stype:{}", stype_id),
191                                        });
192                                    }
193                                }
194                            }
195                        }
196                    }
197                }
198            }
199        }
200    }
201
202    // Sort by SType ID
203    stypes.sort_by(|a, b| a.stype.cmp(&b.stype));
204
205    let total = stypes.len();
206    let stypes: Vec<_> = stypes.into_iter().skip(offset).take(limit).collect();
207
208    Ok(Json(ListResponse {
209        stypes,
210        total,
211        limit,
212        offset,
213    }))
214}
215
216/// Search query
217#[derive(Deserialize)]
218pub struct SearchQuery {
219    pub q: String,
220    pub limit: Option<usize>,
221}
222
223/// Search STypes
224pub async fn search_stypes(
225    State(state): State<Arc<RegistryState>>,
226    Query(query): Query<SearchQuery>,
227) -> Result<Json<ListResponse>, RegistryError> {
228    let limit = query.limit.unwrap_or(20).min(50);
229    let search_term = query.q.to_lowercase();
230
231    // Get all stypes and filter
232    let list_result = list_stypes(
233        State(state),
234        Query(ListQuery {
235            namespace: None,
236            domain: None,
237            limit: Some(1000),
238            offset: Some(0),
239        }),
240    )
241    .await?;
242
243    let matches: Vec<_> = list_result
244        .0
245        .stypes
246        .into_iter()
247        .filter(|s| {
248            s.stype.to_lowercase().contains(&search_term)
249                || s.name.to_lowercase().contains(&search_term)
250                || s.domain.to_lowercase().contains(&search_term)
251        })
252        .take(limit)
253        .collect();
254
255    let total = matches.len();
256
257    Ok(Json(ListResponse {
258        stypes: matches,
259        total,
260        limit,
261        offset: 0,
262    }))
263}
264
265/// Cache statistics
266#[derive(Serialize)]
267pub struct CacheStatsResponse {
268    pub entry_count: u64,
269    pub weighted_size: u64,
270}
271
272/// Get cache stats
273pub async fn cache_stats(
274    State(state): State<Arc<RegistryState>>,
275) -> Json<CacheStatsResponse> {
276    let stats = state.cache.stats();
277    Json(CacheStatsResponse {
278        entry_count: stats.entry_count,
279        weighted_size: stats.weighted_size,
280    })
281}