1use std::path::Path;
2
3use chrono::Utc;
4use rusqlite::{params, Connection};
5
6use crate::daemon::{DaemonInstance, DaemonSpec, HealthStatus};
7use crate::error::{Result, SyspulseError};
8use crate::lifecycle::LifecycleState;
9
10pub struct Registry {
11 conn: Connection,
12}
13
14impl Registry {
15 pub fn new(path: &Path) -> Result<Self> {
16 let conn = Connection::open(path)
17 .map_err(|e| SyspulseError::Database(format!("Failed to open database: {}", e)))?;
18
19 conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON;")
20 .map_err(|e| SyspulseError::Database(format!("Failed to set pragmas: {}", e)))?;
21
22 let registry = Self { conn };
23 registry.run_migrations()?;
24 Ok(registry)
25 }
26
27 fn run_migrations(&self) -> Result<()> {
28 self.conn
29 .execute_batch(
30 "CREATE TABLE IF NOT EXISTS daemon_specs (
31 name TEXT PRIMARY KEY,
32 spec_json TEXT NOT NULL,
33 created_at TEXT NOT NULL,
34 updated_at TEXT NOT NULL
35 );
36
37 CREATE TABLE IF NOT EXISTS daemon_states (
38 name TEXT PRIMARY KEY,
39 instance_id TEXT NOT NULL,
40 state TEXT NOT NULL,
41 pid INTEGER,
42 started_at TEXT,
43 stopped_at TEXT,
44 exit_code INTEGER,
45 restart_count INTEGER DEFAULT 0,
46 health_status TEXT DEFAULT 'unknown',
47 stdout_log TEXT,
48 stderr_log TEXT,
49 FOREIGN KEY (name) REFERENCES daemon_specs(name)
50 );",
51 )
52 .map_err(|e| SyspulseError::Database(format!("Migration failed: {}", e)))?;
53 Ok(())
54 }
55
56 pub fn register(&self, spec: &DaemonSpec) -> Result<()> {
57 let now = Utc::now().to_rfc3339();
58 let json = serde_json::to_string(spec)?;
59
60 self.conn
61 .execute(
62 "INSERT INTO daemon_specs (name, spec_json, created_at, updated_at) VALUES (?1, ?2, ?3, ?4)",
63 params![spec.name, json, now, now],
64 )
65 .map_err(|e| match e {
66 rusqlite::Error::SqliteFailure(err, _)
67 if err.code == rusqlite::ErrorCode::ConstraintViolation =>
68 {
69 SyspulseError::DaemonAlreadyExists(spec.name.clone())
70 }
71 _ => SyspulseError::Database(format!("Failed to register daemon: {}", e)),
72 })?;
73
74 Ok(())
75 }
76
77 pub fn unregister(&self, name: &str) -> Result<()> {
78 self.conn
80 .execute("DELETE FROM daemon_states WHERE name = ?1", params![name])
81 .map_err(|e| SyspulseError::Database(format!("Failed to delete state: {}", e)))?;
82
83 let changes = self
84 .conn
85 .execute("DELETE FROM daemon_specs WHERE name = ?1", params![name])
86 .map_err(|e| SyspulseError::Database(format!("Failed to delete spec: {}", e)))?;
87
88 if changes == 0 {
89 return Err(SyspulseError::DaemonNotFound(name.to_string()));
90 }
91
92 Ok(())
93 }
94
95 pub fn get_spec(&self, name: &str) -> Result<DaemonSpec> {
96 let json: String = self
97 .conn
98 .query_row(
99 "SELECT spec_json FROM daemon_specs WHERE name = ?1",
100 params![name],
101 |row| row.get(0),
102 )
103 .map_err(|e| match e {
104 rusqlite::Error::QueryReturnedNoRows => {
105 SyspulseError::DaemonNotFound(name.to_string())
106 }
107 _ => SyspulseError::Database(format!("Failed to get spec: {}", e)),
108 })?;
109
110 let spec: DaemonSpec = serde_json::from_str(&json)?;
111 Ok(spec)
112 }
113
114 pub fn list_specs(&self) -> Result<Vec<DaemonSpec>> {
115 let mut stmt = self
116 .conn
117 .prepare("SELECT spec_json FROM daemon_specs ORDER BY name")
118 .map_err(|e| SyspulseError::Database(format!("Failed to prepare query: {}", e)))?;
119
120 let specs = stmt
121 .query_map([], |row| {
122 let json: String = row.get(0)?;
123 Ok(json)
124 })
125 .map_err(|e| SyspulseError::Database(format!("Failed to list specs: {}", e)))?
126 .filter_map(|r| {
127 r.ok()
128 .and_then(|json| serde_json::from_str::<DaemonSpec>(&json).ok())
129 })
130 .collect();
131
132 Ok(specs)
133 }
134
135 pub fn update_state(&self, instance: &DaemonInstance) -> Result<()> {
136 let state_str = instance.state.to_string();
137 let health_str = match instance.health_status {
138 HealthStatus::Unknown => "unknown",
139 HealthStatus::Healthy => "healthy",
140 HealthStatus::Unhealthy => "unhealthy",
141 HealthStatus::NotConfigured => "not_configured",
142 };
143 let started_at = instance.started_at.map(|t| t.to_rfc3339());
144 let stopped_at = instance.stopped_at.map(|t| t.to_rfc3339());
145 let stdout_log = instance
146 .stdout_log
147 .as_ref()
148 .map(|p| p.to_string_lossy().to_string());
149 let stderr_log = instance
150 .stderr_log
151 .as_ref()
152 .map(|p| p.to_string_lossy().to_string());
153
154 self.conn
155 .execute(
156 "INSERT INTO daemon_states (name, instance_id, state, pid, started_at, stopped_at, exit_code, restart_count, health_status, stdout_log, stderr_log)
157 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
158 ON CONFLICT(name) DO UPDATE SET
159 instance_id = excluded.instance_id,
160 state = excluded.state,
161 pid = excluded.pid,
162 started_at = excluded.started_at,
163 stopped_at = excluded.stopped_at,
164 exit_code = excluded.exit_code,
165 restart_count = excluded.restart_count,
166 health_status = excluded.health_status,
167 stdout_log = excluded.stdout_log,
168 stderr_log = excluded.stderr_log",
169 params![
170 instance.spec_name,
171 instance.id,
172 state_str,
173 instance.pid,
174 started_at,
175 stopped_at,
176 instance.exit_code,
177 instance.restart_count,
178 health_str,
179 stdout_log,
180 stderr_log,
181 ],
182 )
183 .map_err(|e| SyspulseError::Database(format!("Failed to update state: {}", e)))?;
184
185 Ok(())
186 }
187
188 pub fn get_state(&self, name: &str) -> Result<DaemonInstance> {
189 self.conn
190 .query_row(
191 "SELECT instance_id, state, pid, started_at, stopped_at, exit_code, restart_count, health_status, stdout_log, stderr_log
192 FROM daemon_states WHERE name = ?1",
193 params![name],
194 |row| {
195 Ok(StateRow {
196 name: name.to_string(),
197 instance_id: row.get(0)?,
198 state: row.get::<_, String>(1)?,
199 pid: row.get(2)?,
200 started_at: row.get::<_, Option<String>>(3)?,
201 stopped_at: row.get::<_, Option<String>>(4)?,
202 exit_code: row.get(5)?,
203 restart_count: row.get::<_, Option<u32>>(6)?,
204 health_status: row.get::<_, Option<String>>(7)?,
205 stdout_log: row.get::<_, Option<String>>(8)?,
206 stderr_log: row.get::<_, Option<String>>(9)?,
207 })
208 },
209 )
210 .map_err(|e| match e {
211 rusqlite::Error::QueryReturnedNoRows => {
212 SyspulseError::DaemonNotFound(name.to_string())
213 }
214 _ => SyspulseError::Database(format!("Failed to get state: {}", e)),
215 })
216 .map(|r| r.into_instance())
217 }
218
219 pub fn list_states(&self) -> Result<Vec<DaemonInstance>> {
220 let mut stmt = self
221 .conn
222 .prepare(
223 "SELECT name, instance_id, state, pid, started_at, stopped_at, exit_code, restart_count, health_status, stdout_log, stderr_log
224 FROM daemon_states ORDER BY name",
225 )
226 .map_err(|e| SyspulseError::Database(format!("Failed to prepare query: {}", e)))?;
227
228 let states = stmt
229 .query_map([], |row| {
230 Ok(StateRow {
231 name: row.get(0)?,
232 instance_id: row.get(1)?,
233 state: row.get::<_, String>(2)?,
234 pid: row.get(3)?,
235 started_at: row.get::<_, Option<String>>(4)?,
236 stopped_at: row.get::<_, Option<String>>(5)?,
237 exit_code: row.get(6)?,
238 restart_count: row.get::<_, Option<u32>>(7)?,
239 health_status: row.get::<_, Option<String>>(8)?,
240 stdout_log: row.get::<_, Option<String>>(9)?,
241 stderr_log: row.get::<_, Option<String>>(10)?,
242 })
243 })
244 .map_err(|e| SyspulseError::Database(format!("Failed to list states: {}", e)))?
245 .filter_map(|r| r.ok())
246 .map(|r| r.into_instance())
247 .collect();
248
249 Ok(states)
250 }
251}
252
253struct StateRow {
254 name: String,
255 instance_id: String,
256 state: String,
257 pid: Option<u32>,
258 started_at: Option<String>,
259 stopped_at: Option<String>,
260 exit_code: Option<i32>,
261 restart_count: Option<u32>,
262 health_status: Option<String>,
263 stdout_log: Option<String>,
264 stderr_log: Option<String>,
265}
266
267impl StateRow {
268 fn into_instance(self) -> DaemonInstance {
269 let state = match self.state.as_str() {
270 "stopped" => LifecycleState::Stopped,
271 "starting" => LifecycleState::Starting,
272 "running" => LifecycleState::Running,
273 "stopping" => LifecycleState::Stopping,
274 "failed" => LifecycleState::Failed,
275 "scheduled" => LifecycleState::Scheduled,
276 _ => LifecycleState::Stopped,
277 };
278
279 let health_status = match self.health_status.as_deref() {
280 Some("healthy") => HealthStatus::Healthy,
281 Some("unhealthy") => HealthStatus::Unhealthy,
282 Some("not_configured") => HealthStatus::NotConfigured,
283 _ => HealthStatus::Unknown,
284 };
285
286 let parse_dt = |s: Option<String>| {
287 s.and_then(|v| chrono::DateTime::parse_from_rfc3339(&v).ok())
288 .map(|dt| dt.with_timezone(&Utc))
289 };
290
291 DaemonInstance {
292 id: self.instance_id,
293 spec_name: self.name,
294 state,
295 pid: self.pid,
296 started_at: parse_dt(self.started_at),
297 stopped_at: parse_dt(self.stopped_at),
298 exit_code: self.exit_code,
299 restart_count: self.restart_count.unwrap_or(0),
300 health_status,
301 stdout_log: self.stdout_log.map(std::path::PathBuf::from),
302 stderr_log: self.stderr_log.map(std::path::PathBuf::from),
303 }
304 }
305}