Skip to main content

syspulse_core/
registry.rs

1use std::path::Path;
2
3use chrono::Utc;
4use rusqlite::{params, Connection};
5
6use crate::daemon::{DaemonInstance, DaemonSpec, HealthStatus};
7use crate::error::{Result, SyspulseError};
8use crate::lifecycle::LifecycleState;
9
10pub struct Registry {
11    conn: Connection,
12}
13
14impl Registry {
15    pub fn new(path: &Path) -> Result<Self> {
16        let conn = Connection::open(path)
17            .map_err(|e| SyspulseError::Database(format!("Failed to open database: {}", e)))?;
18
19        conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON;")
20            .map_err(|e| SyspulseError::Database(format!("Failed to set pragmas: {}", e)))?;
21
22        let registry = Self { conn };
23        registry.run_migrations()?;
24        Ok(registry)
25    }
26
27    fn run_migrations(&self) -> Result<()> {
28        self.conn
29            .execute_batch(
30                "CREATE TABLE IF NOT EXISTS daemon_specs (
31                    name TEXT PRIMARY KEY,
32                    spec_json TEXT NOT NULL,
33                    created_at TEXT NOT NULL,
34                    updated_at TEXT NOT NULL
35                );
36
37                CREATE TABLE IF NOT EXISTS daemon_states (
38                    name TEXT PRIMARY KEY,
39                    instance_id TEXT NOT NULL,
40                    state TEXT NOT NULL,
41                    pid INTEGER,
42                    started_at TEXT,
43                    stopped_at TEXT,
44                    exit_code INTEGER,
45                    restart_count INTEGER DEFAULT 0,
46                    health_status TEXT DEFAULT 'unknown',
47                    stdout_log TEXT,
48                    stderr_log TEXT,
49                    FOREIGN KEY (name) REFERENCES daemon_specs(name)
50                );",
51            )
52            .map_err(|e| SyspulseError::Database(format!("Migration failed: {}", e)))?;
53        Ok(())
54    }
55
56    pub fn register(&self, spec: &DaemonSpec) -> Result<()> {
57        let now = Utc::now().to_rfc3339();
58        let json = serde_json::to_string(spec)?;
59
60        self.conn
61            .execute(
62                "INSERT INTO daemon_specs (name, spec_json, created_at, updated_at) VALUES (?1, ?2, ?3, ?4)",
63                params![spec.name, json, now, now],
64            )
65            .map_err(|e| match e {
66                rusqlite::Error::SqliteFailure(err, _)
67                    if err.code == rusqlite::ErrorCode::ConstraintViolation =>
68                {
69                    SyspulseError::DaemonAlreadyExists(spec.name.clone())
70                }
71                _ => SyspulseError::Database(format!("Failed to register daemon: {}", e)),
72            })?;
73
74        Ok(())
75    }
76
77    pub fn unregister(&self, name: &str) -> Result<()> {
78        // Delete state first due to foreign key constraint
79        self.conn
80            .execute("DELETE FROM daemon_states WHERE name = ?1", params![name])
81            .map_err(|e| SyspulseError::Database(format!("Failed to delete state: {}", e)))?;
82
83        let changes = self
84            .conn
85            .execute("DELETE FROM daemon_specs WHERE name = ?1", params![name])
86            .map_err(|e| SyspulseError::Database(format!("Failed to delete spec: {}", e)))?;
87
88        if changes == 0 {
89            return Err(SyspulseError::DaemonNotFound(name.to_string()));
90        }
91
92        Ok(())
93    }
94
95    pub fn get_spec(&self, name: &str) -> Result<DaemonSpec> {
96        let json: String = self
97            .conn
98            .query_row(
99                "SELECT spec_json FROM daemon_specs WHERE name = ?1",
100                params![name],
101                |row| row.get(0),
102            )
103            .map_err(|e| match e {
104                rusqlite::Error::QueryReturnedNoRows => {
105                    SyspulseError::DaemonNotFound(name.to_string())
106                }
107                _ => SyspulseError::Database(format!("Failed to get spec: {}", e)),
108            })?;
109
110        let spec: DaemonSpec = serde_json::from_str(&json)?;
111        Ok(spec)
112    }
113
114    pub fn list_specs(&self) -> Result<Vec<DaemonSpec>> {
115        let mut stmt = self
116            .conn
117            .prepare("SELECT spec_json FROM daemon_specs ORDER BY name")
118            .map_err(|e| SyspulseError::Database(format!("Failed to prepare query: {}", e)))?;
119
120        let specs = stmt
121            .query_map([], |row| {
122                let json: String = row.get(0)?;
123                Ok(json)
124            })
125            .map_err(|e| SyspulseError::Database(format!("Failed to list specs: {}", e)))?
126            .filter_map(|r| {
127                r.ok()
128                    .and_then(|json| serde_json::from_str::<DaemonSpec>(&json).ok())
129            })
130            .collect();
131
132        Ok(specs)
133    }
134
135    pub fn update_state(&self, instance: &DaemonInstance) -> Result<()> {
136        let state_str = instance.state.to_string();
137        let health_str = match instance.health_status {
138            HealthStatus::Unknown => "unknown",
139            HealthStatus::Healthy => "healthy",
140            HealthStatus::Unhealthy => "unhealthy",
141            HealthStatus::NotConfigured => "not_configured",
142        };
143        let started_at = instance.started_at.map(|t| t.to_rfc3339());
144        let stopped_at = instance.stopped_at.map(|t| t.to_rfc3339());
145        let stdout_log = instance
146            .stdout_log
147            .as_ref()
148            .map(|p| p.to_string_lossy().to_string());
149        let stderr_log = instance
150            .stderr_log
151            .as_ref()
152            .map(|p| p.to_string_lossy().to_string());
153
154        self.conn
155            .execute(
156                "INSERT INTO daemon_states (name, instance_id, state, pid, started_at, stopped_at, exit_code, restart_count, health_status, stdout_log, stderr_log)
157                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
158                 ON CONFLICT(name) DO UPDATE SET
159                     instance_id = excluded.instance_id,
160                     state = excluded.state,
161                     pid = excluded.pid,
162                     started_at = excluded.started_at,
163                     stopped_at = excluded.stopped_at,
164                     exit_code = excluded.exit_code,
165                     restart_count = excluded.restart_count,
166                     health_status = excluded.health_status,
167                     stdout_log = excluded.stdout_log,
168                     stderr_log = excluded.stderr_log",
169                params![
170                    instance.spec_name,
171                    instance.id,
172                    state_str,
173                    instance.pid,
174                    started_at,
175                    stopped_at,
176                    instance.exit_code,
177                    instance.restart_count,
178                    health_str,
179                    stdout_log,
180                    stderr_log,
181                ],
182            )
183            .map_err(|e| SyspulseError::Database(format!("Failed to update state: {}", e)))?;
184
185        Ok(())
186    }
187
188    pub fn get_state(&self, name: &str) -> Result<DaemonInstance> {
189        self.conn
190            .query_row(
191                "SELECT instance_id, state, pid, started_at, stopped_at, exit_code, restart_count, health_status, stdout_log, stderr_log
192                 FROM daemon_states WHERE name = ?1",
193                params![name],
194                |row| {
195                    Ok(StateRow {
196                        name: name.to_string(),
197                        instance_id: row.get(0)?,
198                        state: row.get::<_, String>(1)?,
199                        pid: row.get(2)?,
200                        started_at: row.get::<_, Option<String>>(3)?,
201                        stopped_at: row.get::<_, Option<String>>(4)?,
202                        exit_code: row.get(5)?,
203                        restart_count: row.get::<_, Option<u32>>(6)?,
204                        health_status: row.get::<_, Option<String>>(7)?,
205                        stdout_log: row.get::<_, Option<String>>(8)?,
206                        stderr_log: row.get::<_, Option<String>>(9)?,
207                    })
208                },
209            )
210            .map_err(|e| match e {
211                rusqlite::Error::QueryReturnedNoRows => {
212                    SyspulseError::DaemonNotFound(name.to_string())
213                }
214                _ => SyspulseError::Database(format!("Failed to get state: {}", e)),
215            })
216            .map(|r| r.into_instance())
217    }
218
219    pub fn list_states(&self) -> Result<Vec<DaemonInstance>> {
220        let mut stmt = self
221            .conn
222            .prepare(
223                "SELECT name, instance_id, state, pid, started_at, stopped_at, exit_code, restart_count, health_status, stdout_log, stderr_log
224                 FROM daemon_states ORDER BY name",
225            )
226            .map_err(|e| SyspulseError::Database(format!("Failed to prepare query: {}", e)))?;
227
228        let states = stmt
229            .query_map([], |row| {
230                Ok(StateRow {
231                    name: row.get(0)?,
232                    instance_id: row.get(1)?,
233                    state: row.get::<_, String>(2)?,
234                    pid: row.get(3)?,
235                    started_at: row.get::<_, Option<String>>(4)?,
236                    stopped_at: row.get::<_, Option<String>>(5)?,
237                    exit_code: row.get(6)?,
238                    restart_count: row.get::<_, Option<u32>>(7)?,
239                    health_status: row.get::<_, Option<String>>(8)?,
240                    stdout_log: row.get::<_, Option<String>>(9)?,
241                    stderr_log: row.get::<_, Option<String>>(10)?,
242                })
243            })
244            .map_err(|e| SyspulseError::Database(format!("Failed to list states: {}", e)))?
245            .filter_map(|r| r.ok())
246            .map(|r| r.into_instance())
247            .collect();
248
249        Ok(states)
250    }
251}
252
253struct StateRow {
254    name: String,
255    instance_id: String,
256    state: String,
257    pid: Option<u32>,
258    started_at: Option<String>,
259    stopped_at: Option<String>,
260    exit_code: Option<i32>,
261    restart_count: Option<u32>,
262    health_status: Option<String>,
263    stdout_log: Option<String>,
264    stderr_log: Option<String>,
265}
266
267impl StateRow {
268    fn into_instance(self) -> DaemonInstance {
269        let state = match self.state.as_str() {
270            "stopped" => LifecycleState::Stopped,
271            "starting" => LifecycleState::Starting,
272            "running" => LifecycleState::Running,
273            "stopping" => LifecycleState::Stopping,
274            "failed" => LifecycleState::Failed,
275            "scheduled" => LifecycleState::Scheduled,
276            _ => LifecycleState::Stopped,
277        };
278
279        let health_status = match self.health_status.as_deref() {
280            Some("healthy") => HealthStatus::Healthy,
281            Some("unhealthy") => HealthStatus::Unhealthy,
282            Some("not_configured") => HealthStatus::NotConfigured,
283            _ => HealthStatus::Unknown,
284        };
285
286        let parse_dt = |s: Option<String>| {
287            s.and_then(|v| chrono::DateTime::parse_from_rfc3339(&v).ok())
288                .map(|dt| dt.with_timezone(&Utc))
289        };
290
291        DaemonInstance {
292            id: self.instance_id,
293            spec_name: self.name,
294            state,
295            pid: self.pid,
296            started_at: parse_dt(self.started_at),
297            stopped_at: parse_dt(self.stopped_at),
298            exit_code: self.exit_code,
299            restart_count: self.restart_count.unwrap_or(0),
300            health_status,
301            stdout_log: self.stdout_log.map(std::path::PathBuf::from),
302            stderr_log: self.stderr_log.map(std::path::PathBuf::from),
303        }
304    }
305}