Skip to main content

plexus_registry/
storage.rs

1use crate::types::{BackendConfig, BackendInfo, BackendSource, RegistryConfig, RegistryStorageConfig};
2use sqlx::{sqlite::{SqliteConnectOptions, SqlitePool}, ConnectOptions, Row};
3use std::path::Path;
4use std::time::{SystemTime, UNIX_EPOCH};
5use uuid::Uuid;
6
7/// Storage layer for backend registry
8pub struct RegistryStorage {
9    pool: SqlitePool,
10    config_path: Option<std::path::PathBuf>,
11}
12
13impl RegistryStorage {
14    /// Create a new registry storage instance
15    pub async fn new(config: RegistryStorageConfig) -> Result<Self, String> {
16        // Ensure config directory exists
17        if let Some(parent) = config.db_path.parent() {
18            std::fs::create_dir_all(parent)
19                .map_err(|e| format!("Failed to create config directory: {}", e))?;
20        }
21
22        // Initialize database
23        let db_url = format!("sqlite:{}?mode=rwc", config.db_path.display());
24        let connect_options: SqliteConnectOptions = db_url
25            .parse()
26            .map_err(|e| format!("Failed to parse database URL: {}", e))?;
27        let connect_options = connect_options.disable_statement_logging();
28
29        let pool = SqlitePool::connect_with(connect_options)
30            .await
31            .map_err(|e| format!("Failed to connect to registry database: {}", e))?;
32
33        let storage = Self {
34            pool,
35            config_path: config.config_path,
36        };
37
38        storage.run_migrations().await?;
39        Ok(storage)
40    }
41
42    /// Run database migrations
43    async fn run_migrations(&self) -> Result<(), String> {
44        sqlx::query(
45            r#"
46            CREATE TABLE IF NOT EXISTS backends (
47                id TEXT PRIMARY KEY,
48                name TEXT NOT NULL UNIQUE,
49                host TEXT NOT NULL,
50                port INTEGER NOT NULL,
51                protocol TEXT NOT NULL,
52                description TEXT,
53                namespace TEXT,
54                version TEXT,
55                metadata TEXT,
56                source TEXT NOT NULL,
57                is_active INTEGER NOT NULL DEFAULT 1,
58                registered_at INTEGER NOT NULL,
59                last_seen INTEGER,
60                created_at INTEGER NOT NULL,
61                updated_at INTEGER NOT NULL
62            );
63
64            CREATE INDEX IF NOT EXISTS idx_backends_name ON backends(name);
65            CREATE INDEX IF NOT EXISTS idx_backends_active ON backends(is_active);
66            CREATE INDEX IF NOT EXISTS idx_backends_source ON backends(source);
67            "#,
68        )
69        .execute(&self.pool)
70        .await
71        .map_err(|e| format!("Failed to run migrations: {}", e))?;
72
73        Ok(())
74    }
75
76    /// Register a new backend
77    pub async fn register(
78        &self,
79        name: String,
80        host: String,
81        port: u16,
82        protocol: String,
83        description: Option<String>,
84        namespace: Option<String>,
85        source: BackendSource,
86    ) -> Result<BackendInfo, String> {
87        let id = Uuid::new_v4().to_string();
88        let now = current_timestamp();
89
90        sqlx::query(
91            r#"
92            INSERT INTO backends (
93                id, name, host, port, protocol, description, namespace,
94                version, metadata, source, is_active, registered_at, created_at, updated_at
95            )
96            VALUES (?, ?, ?, ?, ?, ?, ?, NULL, NULL, ?, 1, ?, ?, ?)
97            "#,
98        )
99        .bind(&id)
100        .bind(&name)
101        .bind(&host)
102        .bind(port as i64)
103        .bind(&protocol)
104        .bind(&description)
105        .bind(&namespace)
106        .bind(source.as_str())
107        .bind(now)
108        .bind(now)
109        .bind(now)
110        .execute(&self.pool)
111        .await
112        .map_err(|e| format!("Failed to register backend: {}", e))?;
113
114        Ok(BackendInfo {
115            id,
116            name,
117            host,
118            port,
119            protocol,
120            description,
121            namespace,
122            version: None,
123            metadata: None,
124            source,
125            is_active: true,
126            registered_at: now,
127            last_seen: None,
128            created_at: now,
129            updated_at: now,
130        })
131    }
132
133    /// List all backends (optionally filter by active status)
134    pub async fn list(&self, active_only: bool) -> Result<Vec<BackendInfo>, String> {
135        let query = if active_only {
136            "SELECT * FROM backends WHERE is_active = 1 ORDER BY created_at DESC"
137        } else {
138            "SELECT * FROM backends ORDER BY created_at DESC"
139        };
140
141        let rows = sqlx::query(query)
142            .fetch_all(&self.pool)
143            .await
144            .map_err(|e| format!("Failed to list backends: {}", e))?;
145
146        rows.into_iter()
147            .map(|row| row_to_backend_info(row))
148            .collect()
149    }
150
151    /// Get a backend by name
152    pub async fn get(&self, name: &str) -> Result<Option<BackendInfo>, String> {
153        let row = sqlx::query("SELECT * FROM backends WHERE name = ?")
154            .bind(name)
155            .fetch_optional(&self.pool)
156            .await
157            .map_err(|e| format!("Failed to get backend: {}", e))?;
158
159        match row {
160            Some(row) => Ok(Some(row_to_backend_info(row)?)),
161            None => Ok(None),
162        }
163    }
164
165    /// Update a backend
166    pub async fn update(
167        &self,
168        name: &str,
169        host: Option<String>,
170        port: Option<u16>,
171        protocol: Option<String>,
172        description: Option<String>,
173        namespace: Option<String>,
174    ) -> Result<Option<BackendInfo>, String> {
175        // Get current backend
176        let current = match self.get(name).await? {
177            Some(backend) => backend,
178            None => return Ok(None),
179        };
180
181        let new_host = host.unwrap_or(current.host);
182        let new_port = port.unwrap_or(current.port);
183        let new_protocol = protocol.unwrap_or(current.protocol);
184        let new_description = description.or(current.description);
185        let new_namespace = namespace.or(current.namespace);
186        let now = current_timestamp();
187
188        sqlx::query(
189            r#"
190            UPDATE backends
191            SET host = ?, port = ?, protocol = ?, description = ?, namespace = ?, updated_at = ?
192            WHERE name = ?
193            "#,
194        )
195        .bind(&new_host)
196        .bind(new_port as i64)
197        .bind(&new_protocol)
198        .bind(&new_description)
199        .bind(&new_namespace)
200        .bind(now)
201        .bind(name)
202        .execute(&self.pool)
203        .await
204        .map_err(|e| format!("Failed to update backend: {}", e))?;
205
206        self.get(name).await
207    }
208
209    /// Delete a backend
210    pub async fn delete(&self, name: &str) -> Result<bool, String> {
211        let result = sqlx::query("DELETE FROM backends WHERE name = ?")
212            .bind(name)
213            .execute(&self.pool)
214            .await
215            .map_err(|e| format!("Failed to delete backend: {}", e))?;
216
217        Ok(result.rows_affected() > 0)
218    }
219
220    /// Update last_seen timestamp for health checks
221    pub async fn ping(&self, name: &str) -> Result<bool, String> {
222        let now = current_timestamp();
223
224        let result = sqlx::query("UPDATE backends SET last_seen = ? WHERE name = ?")
225            .bind(now)
226            .bind(name)
227            .execute(&self.pool)
228            .await
229            .map_err(|e| format!("Failed to ping backend: {}", e))?;
230
231        Ok(result.rows_affected() > 0)
232    }
233
234    /// Load backends from TOML config file
235    pub async fn load_config(&self) -> Result<Vec<BackendInfo>, String> {
236        let config_path = match &self.config_path {
237            Some(path) => path,
238            None => return Ok(vec![]),
239        };
240
241        if !config_path.exists() {
242            tracing::info!("Config file not found: {}", config_path.display());
243            return Ok(vec![]);
244        }
245
246        let loaded = load_backends_from_toml(config_path)?;
247        let mut registered = Vec::new();
248
249        for backend in loaded {
250            // Check if backend already exists
251            if self.get(&backend.name).await?.is_some() {
252                tracing::debug!("Backend {} already exists, skipping", backend.name);
253                continue;
254            }
255
256            // Register new backend
257            match self
258                .register(
259                    backend.name.clone(),
260                    backend.host,
261                    backend.port,
262                    backend.protocol,
263                    backend.description,
264                    backend.namespace,
265                    BackendSource::File,
266                )
267                .await
268            {
269                Ok(info) => {
270                    tracing::info!("Loaded backend from config: {}", info.name);
271                    registered.push(info);
272                }
273                Err(e) => {
274                    tracing::error!("Failed to register backend {}: {}", backend.name, e);
275                }
276            }
277        }
278
279        Ok(registered)
280    }
281
282    /// Reload config file (re-load and register any new backends)
283    pub async fn reload_config(&self) -> Result<usize, String> {
284        let loaded = self.load_config().await?;
285        Ok(loaded.len())
286    }
287}
288
289/// Convert database row to BackendInfo
290fn row_to_backend_info(row: sqlx::sqlite::SqliteRow) -> Result<BackendInfo, String> {
291    let source_str: String = row.get("source");
292    let source = BackendSource::from_str(&source_str)
293        .ok_or_else(|| format!("Invalid backend source: {}", source_str))?;
294
295    Ok(BackendInfo {
296        id: row.get("id"),
297        name: row.get("name"),
298        host: row.get("host"),
299        port: row.get::<i64, _>("port") as u16,
300        protocol: row.get("protocol"),
301        description: row.get("description"),
302        namespace: row.get("namespace"),
303        version: row.get("version"),
304        metadata: row.get("metadata"),
305        source,
306        is_active: row.get::<i64, _>("is_active") != 0,
307        registered_at: row.get("registered_at"),
308        last_seen: row.get("last_seen"),
309        created_at: row.get("created_at"),
310        updated_at: row.get("updated_at"),
311    })
312}
313
314/// Load backends from TOML config file
315fn load_backends_from_toml(path: &Path) -> Result<Vec<BackendConfig>, String> {
316    let content = std::fs::read_to_string(path)
317        .map_err(|e| format!("Failed to read config file: {}", e))?;
318
319    let config: RegistryConfig =
320        toml::from_str(&content).map_err(|e| format!("Failed to parse TOML config: {}", e))?;
321
322    Ok(config.backend)
323}
324
325/// Get current Unix timestamp in seconds
326fn current_timestamp() -> i64 {
327    SystemTime::now()
328        .duration_since(UNIX_EPOCH)
329        .unwrap()
330        .as_secs() as i64
331}