1use crate::types::{BackendConfig, BackendInfo, BackendSource, RegistryConfig, RegistryStorageConfig};
2use sqlx::{sqlite::{SqliteConnectOptions, SqlitePool}, ConnectOptions, Row};
3use std::path::Path;
4use std::time::{SystemTime, UNIX_EPOCH};
5use uuid::Uuid;
6
7pub struct RegistryStorage {
9 pool: SqlitePool,
10 config_path: Option<std::path::PathBuf>,
11}
12
13impl RegistryStorage {
14 pub async fn new(config: RegistryStorageConfig) -> Result<Self, String> {
16 if let Some(parent) = config.db_path.parent() {
18 std::fs::create_dir_all(parent)
19 .map_err(|e| format!("Failed to create config directory: {}", e))?;
20 }
21
22 let db_url = format!("sqlite:{}?mode=rwc", config.db_path.display());
24 let connect_options: SqliteConnectOptions = db_url
25 .parse()
26 .map_err(|e| format!("Failed to parse database URL: {}", e))?;
27 let connect_options = connect_options.disable_statement_logging();
28
29 let pool = SqlitePool::connect_with(connect_options)
30 .await
31 .map_err(|e| format!("Failed to connect to registry database: {}", e))?;
32
33 let storage = Self {
34 pool,
35 config_path: config.config_path,
36 };
37
38 storage.run_migrations().await?;
39 Ok(storage)
40 }
41
42 async fn run_migrations(&self) -> Result<(), String> {
44 sqlx::query(
45 r#"
46 CREATE TABLE IF NOT EXISTS backends (
47 id TEXT PRIMARY KEY,
48 name TEXT NOT NULL UNIQUE,
49 host TEXT NOT NULL,
50 port INTEGER NOT NULL,
51 protocol TEXT NOT NULL,
52 description TEXT,
53 namespace TEXT,
54 version TEXT,
55 metadata TEXT,
56 source TEXT NOT NULL,
57 is_active INTEGER NOT NULL DEFAULT 1,
58 registered_at INTEGER NOT NULL,
59 last_seen INTEGER,
60 created_at INTEGER NOT NULL,
61 updated_at INTEGER NOT NULL
62 );
63
64 CREATE INDEX IF NOT EXISTS idx_backends_name ON backends(name);
65 CREATE INDEX IF NOT EXISTS idx_backends_active ON backends(is_active);
66 CREATE INDEX IF NOT EXISTS idx_backends_source ON backends(source);
67 "#,
68 )
69 .execute(&self.pool)
70 .await
71 .map_err(|e| format!("Failed to run migrations: {}", e))?;
72
73 Ok(())
74 }
75
76 pub async fn register(
78 &self,
79 name: String,
80 host: String,
81 port: u16,
82 protocol: String,
83 description: Option<String>,
84 namespace: Option<String>,
85 source: BackendSource,
86 ) -> Result<BackendInfo, String> {
87 let id = Uuid::new_v4().to_string();
88 let now = current_timestamp();
89
90 sqlx::query(
91 r#"
92 INSERT INTO backends (
93 id, name, host, port, protocol, description, namespace,
94 version, metadata, source, is_active, registered_at, created_at, updated_at
95 )
96 VALUES (?, ?, ?, ?, ?, ?, ?, NULL, NULL, ?, 1, ?, ?, ?)
97 "#,
98 )
99 .bind(&id)
100 .bind(&name)
101 .bind(&host)
102 .bind(port as i64)
103 .bind(&protocol)
104 .bind(&description)
105 .bind(&namespace)
106 .bind(source.as_str())
107 .bind(now)
108 .bind(now)
109 .bind(now)
110 .execute(&self.pool)
111 .await
112 .map_err(|e| format!("Failed to register backend: {}", e))?;
113
114 Ok(BackendInfo {
115 id,
116 name,
117 host,
118 port,
119 protocol,
120 description,
121 namespace,
122 version: None,
123 metadata: None,
124 source,
125 is_active: true,
126 registered_at: now,
127 last_seen: None,
128 created_at: now,
129 updated_at: now,
130 })
131 }
132
133 pub async fn list(&self, active_only: bool) -> Result<Vec<BackendInfo>, String> {
135 let query = if active_only {
136 "SELECT * FROM backends WHERE is_active = 1 ORDER BY created_at DESC"
137 } else {
138 "SELECT * FROM backends ORDER BY created_at DESC"
139 };
140
141 let rows = sqlx::query(query)
142 .fetch_all(&self.pool)
143 .await
144 .map_err(|e| format!("Failed to list backends: {}", e))?;
145
146 rows.into_iter()
147 .map(|row| row_to_backend_info(row))
148 .collect()
149 }
150
151 pub async fn get(&self, name: &str) -> Result<Option<BackendInfo>, String> {
153 let row = sqlx::query("SELECT * FROM backends WHERE name = ?")
154 .bind(name)
155 .fetch_optional(&self.pool)
156 .await
157 .map_err(|e| format!("Failed to get backend: {}", e))?;
158
159 match row {
160 Some(row) => Ok(Some(row_to_backend_info(row)?)),
161 None => Ok(None),
162 }
163 }
164
165 pub async fn update(
167 &self,
168 name: &str,
169 host: Option<String>,
170 port: Option<u16>,
171 protocol: Option<String>,
172 description: Option<String>,
173 namespace: Option<String>,
174 ) -> Result<Option<BackendInfo>, String> {
175 let current = match self.get(name).await? {
177 Some(backend) => backend,
178 None => return Ok(None),
179 };
180
181 let new_host = host.unwrap_or(current.host);
182 let new_port = port.unwrap_or(current.port);
183 let new_protocol = protocol.unwrap_or(current.protocol);
184 let new_description = description.or(current.description);
185 let new_namespace = namespace.or(current.namespace);
186 let now = current_timestamp();
187
188 sqlx::query(
189 r#"
190 UPDATE backends
191 SET host = ?, port = ?, protocol = ?, description = ?, namespace = ?, updated_at = ?
192 WHERE name = ?
193 "#,
194 )
195 .bind(&new_host)
196 .bind(new_port as i64)
197 .bind(&new_protocol)
198 .bind(&new_description)
199 .bind(&new_namespace)
200 .bind(now)
201 .bind(name)
202 .execute(&self.pool)
203 .await
204 .map_err(|e| format!("Failed to update backend: {}", e))?;
205
206 self.get(name).await
207 }
208
209 pub async fn delete(&self, name: &str) -> Result<bool, String> {
211 let result = sqlx::query("DELETE FROM backends WHERE name = ?")
212 .bind(name)
213 .execute(&self.pool)
214 .await
215 .map_err(|e| format!("Failed to delete backend: {}", e))?;
216
217 Ok(result.rows_affected() > 0)
218 }
219
220 pub async fn ping(&self, name: &str) -> Result<bool, String> {
222 let now = current_timestamp();
223
224 let result = sqlx::query("UPDATE backends SET last_seen = ? WHERE name = ?")
225 .bind(now)
226 .bind(name)
227 .execute(&self.pool)
228 .await
229 .map_err(|e| format!("Failed to ping backend: {}", e))?;
230
231 Ok(result.rows_affected() > 0)
232 }
233
234 pub async fn load_config(&self) -> Result<Vec<BackendInfo>, String> {
236 let config_path = match &self.config_path {
237 Some(path) => path,
238 None => return Ok(vec![]),
239 };
240
241 if !config_path.exists() {
242 tracing::info!("Config file not found: {}", config_path.display());
243 return Ok(vec![]);
244 }
245
246 let loaded = load_backends_from_toml(config_path)?;
247 let mut registered = Vec::new();
248
249 for backend in loaded {
250 if self.get(&backend.name).await?.is_some() {
252 tracing::debug!("Backend {} already exists, skipping", backend.name);
253 continue;
254 }
255
256 match self
258 .register(
259 backend.name.clone(),
260 backend.host,
261 backend.port,
262 backend.protocol,
263 backend.description,
264 backend.namespace,
265 BackendSource::File,
266 )
267 .await
268 {
269 Ok(info) => {
270 tracing::info!("Loaded backend from config: {}", info.name);
271 registered.push(info);
272 }
273 Err(e) => {
274 tracing::error!("Failed to register backend {}: {}", backend.name, e);
275 }
276 }
277 }
278
279 Ok(registered)
280 }
281
282 pub async fn reload_config(&self) -> Result<usize, String> {
284 let loaded = self.load_config().await?;
285 Ok(loaded.len())
286 }
287}
288
289fn row_to_backend_info(row: sqlx::sqlite::SqliteRow) -> Result<BackendInfo, String> {
291 let source_str: String = row.get("source");
292 let source = BackendSource::from_str(&source_str)
293 .ok_or_else(|| format!("Invalid backend source: {}", source_str))?;
294
295 Ok(BackendInfo {
296 id: row.get("id"),
297 name: row.get("name"),
298 host: row.get("host"),
299 port: row.get::<i64, _>("port") as u16,
300 protocol: row.get("protocol"),
301 description: row.get("description"),
302 namespace: row.get("namespace"),
303 version: row.get("version"),
304 metadata: row.get("metadata"),
305 source,
306 is_active: row.get::<i64, _>("is_active") != 0,
307 registered_at: row.get("registered_at"),
308 last_seen: row.get("last_seen"),
309 created_at: row.get("created_at"),
310 updated_at: row.get("updated_at"),
311 })
312}
313
314fn load_backends_from_toml(path: &Path) -> Result<Vec<BackendConfig>, String> {
316 let content = std::fs::read_to_string(path)
317 .map_err(|e| format!("Failed to read config file: {}", e))?;
318
319 let config: RegistryConfig =
320 toml::from_str(&content).map_err(|e| format!("Failed to parse TOML config: {}", e))?;
321
322 Ok(config.backend)
323}
324
325fn current_timestamp() -> i64 {
327 SystemTime::now()
328 .duration_since(UNIX_EPOCH)
329 .unwrap()
330 .as_secs() as i64
331}