1use anyhow::{Context, Result};
2use chrono::Utc;
3use serde_json::Value;
4use sqlx::{sqlite::SqlitePoolOptions, Pool, Sqlite};
5use uuid::Uuid;
6
7use planspec_core::{WatchEvent, WatchEventType};
8
9use super::types::StoredObject;
10
11type DbRow = (
13 String,
14 String,
15 String,
16 String,
17 String,
18 i64,
19 i64,
20 String,
21 String,
22);
23
24#[derive(Clone)]
26pub struct Store {
27 pool: Pool<Sqlite>,
28}
29
30impl Store {
31 pub async fn new(path: &str) -> Result<Self> {
33 let url = format!("sqlite:{}?mode=rwc", path);
34 let pool = SqlitePoolOptions::new()
35 .max_connections(5)
36 .connect(&url)
37 .await
38 .context("Failed to connect to SQLite database")?;
39
40 sqlx::query(
42 r#"
43 CREATE TABLE IF NOT EXISTS resources (
44 namespace TEXT NOT NULL,
45 kind TEXT NOT NULL,
46 name TEXT NOT NULL,
47 object_json TEXT NOT NULL,
48 uid TEXT NOT NULL,
49 resource_version INTEGER NOT NULL,
50 generation INTEGER NOT NULL,
51 created_at TEXT NOT NULL,
52 updated_at TEXT NOT NULL,
53 PRIMARY KEY (namespace, kind, name)
54 );
55
56 CREATE INDEX IF NOT EXISTS idx_resources_kind ON resources(kind);
57 CREATE INDEX IF NOT EXISTS idx_resources_ns_kind ON resources(namespace, kind);
58 "#,
59 )
60 .execute(&pool)
61 .await
62 .context("Failed to initialize database schema")?;
63
64 sqlx::query(
66 r#"
67 CREATE TABLE IF NOT EXISTS revision (
68 id INTEGER PRIMARY KEY CHECK (id = 1),
69 current INTEGER NOT NULL DEFAULT 0
70 );
71 INSERT OR IGNORE INTO revision (id, current) VALUES (1, 0);
72 "#,
73 )
74 .execute(&pool)
75 .await
76 .context("Failed to initialize revision counter")?;
77
78 sqlx::query(
80 r#"
81 CREATE TABLE IF NOT EXISTS namespaces (
82 name TEXT PRIMARY KEY,
83 uid TEXT NOT NULL,
84 resource_version INTEGER NOT NULL,
85 created_at TEXT NOT NULL
86 );
87 "#,
88 )
89 .execute(&pool)
90 .await
91 .context("Failed to initialize namespaces table")?;
92
93 let store = Self { pool };
94
95 if !store.namespace_exists("default").await? {
97 store.create_namespace("default").await?;
98 }
99
100 Ok(store)
101 }
102
103 async fn next_revision(&self) -> Result<i64> {
105 let row: (i64,) = sqlx::query_as(
106 "UPDATE revision SET current = current + 1 WHERE id = 1 RETURNING current",
107 )
108 .fetch_one(&self.pool)
109 .await?;
110 Ok(row.0)
111 }
112
113 pub async fn create(
115 &self,
116 namespace: &str,
117 kind: &str,
118 name: &str,
119 mut object: Value,
120 ) -> Result<(StoredObject, WatchEvent)> {
121 let now = Utc::now();
122 let uid = Uuid::new_v4().to_string();
123 let resource_version = self.next_revision().await?;
124 let generation = 1i64;
125
126 if let Some(metadata) = object.get_mut("metadata").and_then(|m| m.as_object_mut()) {
128 metadata.insert("uid".to_string(), Value::String(uid.clone()));
129 metadata.insert(
130 "resourceVersion".to_string(),
131 Value::String(resource_version.to_string()),
132 );
133 metadata.insert("generation".to_string(), Value::Number(generation.into()));
134 metadata.insert(
135 "creationTimestamp".to_string(),
136 Value::String(now.to_rfc3339()),
137 );
138 }
139
140 let object_json = serde_json::to_string(&object)?;
141
142 sqlx::query(
143 r#"
144 INSERT INTO resources (namespace, kind, name, object_json, uid, resource_version, generation, created_at, updated_at)
145 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
146 "#,
147 )
148 .bind(namespace)
149 .bind(kind)
150 .bind(name)
151 .bind(&object_json)
152 .bind(&uid)
153 .bind(resource_version)
154 .bind(generation)
155 .bind(now.to_rfc3339())
156 .bind(now.to_rfc3339())
157 .execute(&self.pool)
158 .await
159 .context("Failed to insert resource")?;
160
161 let stored = StoredObject {
162 namespace: namespace.to_string(),
163 kind: kind.to_string(),
164 name: name.to_string(),
165 object: object.clone(),
166 uid,
167 resource_version,
168 generation,
169 created_at: now,
170 updated_at: now,
171 };
172
173 let event = WatchEvent {
174 event_type: WatchEventType::Added,
175 object,
176 rev: resource_version.to_string(),
177 };
178
179 Ok((stored, event))
180 }
181
182 pub async fn get(
184 &self,
185 namespace: &str,
186 kind: &str,
187 name: &str,
188 ) -> Result<Option<StoredObject>> {
189 let row: Option<DbRow> =
190 sqlx::query_as(
191 r#"
192 SELECT namespace, kind, name, object_json, uid, resource_version, generation, created_at, updated_at
193 FROM resources
194 WHERE namespace = ? AND kind = ? AND name = ?
195 "#,
196 )
197 .bind(namespace)
198 .bind(kind)
199 .bind(name)
200 .fetch_optional(&self.pool)
201 .await?;
202
203 match row {
204 Some((ns, k, n, obj_json, uid, rv, gen, created, updated)) => {
205 let object: Value = serde_json::from_str(&obj_json)?;
206 Ok(Some(StoredObject {
207 namespace: ns,
208 kind: k,
209 name: n,
210 object,
211 uid,
212 resource_version: rv,
213 generation: gen,
214 created_at: chrono::DateTime::parse_from_rfc3339(&created)?.with_timezone(&Utc),
215 updated_at: chrono::DateTime::parse_from_rfc3339(&updated)?.with_timezone(&Utc),
216 }))
217 }
218 None => Ok(None),
219 }
220 }
221
222 pub async fn list(&self, namespace: &str, kind: &str) -> Result<Vec<StoredObject>> {
224 let rows: Vec<DbRow> =
225 sqlx::query_as(
226 r#"
227 SELECT namespace, kind, name, object_json, uid, resource_version, generation, created_at, updated_at
228 FROM resources
229 WHERE namespace = ? AND kind = ?
230 ORDER BY name
231 "#,
232 )
233 .bind(namespace)
234 .bind(kind)
235 .fetch_all(&self.pool)
236 .await?;
237
238 rows.into_iter()
239 .map(|(ns, k, n, obj_json, uid, rv, gen, created, updated)| {
240 let object: Value = serde_json::from_str(&obj_json)?;
241 Ok(StoredObject {
242 namespace: ns,
243 kind: k,
244 name: n,
245 object,
246 uid,
247 resource_version: rv,
248 generation: gen,
249 created_at: chrono::DateTime::parse_from_rfc3339(&created)?.with_timezone(&Utc),
250 updated_at: chrono::DateTime::parse_from_rfc3339(&updated)?.with_timezone(&Utc),
251 })
252 })
253 .collect()
254 }
255
256 pub async fn list_all(&self, kind: &str) -> Result<Vec<StoredObject>> {
258 let rows: Vec<DbRow> =
259 sqlx::query_as(
260 r#"
261 SELECT namespace, kind, name, object_json, uid, resource_version, generation, created_at, updated_at
262 FROM resources
263 WHERE kind = ?
264 ORDER BY namespace, name
265 "#,
266 )
267 .bind(kind)
268 .fetch_all(&self.pool)
269 .await?;
270
271 rows.into_iter()
272 .map(|(ns, k, n, obj_json, uid, rv, gen, created, updated)| {
273 let object: Value = serde_json::from_str(&obj_json)?;
274 Ok(StoredObject {
275 namespace: ns,
276 kind: k,
277 name: n,
278 object,
279 uid,
280 resource_version: rv,
281 generation: gen,
282 created_at: chrono::DateTime::parse_from_rfc3339(&created)?.with_timezone(&Utc),
283 updated_at: chrono::DateTime::parse_from_rfc3339(&updated)?.with_timezone(&Utc),
284 })
285 })
286 .collect()
287 }
288
289 pub async fn replace(
291 &self,
292 namespace: &str,
293 kind: &str,
294 name: &str,
295 mut object: Value,
296 expected_resource_version: Option<&str>,
297 ) -> Result<(StoredObject, WatchEvent)> {
298 let existing = self
300 .get(namespace, kind, name)
301 .await?
302 .ok_or_else(|| anyhow::anyhow!("Resource not found"))?;
303
304 if let Some(expected_rv) = expected_resource_version {
306 let expected: i64 = expected_rv.parse()?;
307 if existing.resource_version != expected {
308 anyhow::bail!(
309 "Conflict: resourceVersion mismatch (expected {}, got {})",
310 expected_rv,
311 existing.resource_version
312 );
313 }
314 }
315
316 let now = Utc::now();
317 let resource_version = self.next_revision().await?;
318
319 let old_spec = existing.object.get("spec");
321 let new_spec = object.get("spec");
322 let generation = if old_spec != new_spec {
323 existing.generation + 1
324 } else {
325 existing.generation
326 };
327
328 if let Some(metadata) = object.get_mut("metadata").and_then(|m| m.as_object_mut()) {
330 metadata.insert("uid".to_string(), Value::String(existing.uid.clone()));
331 metadata.insert(
332 "resourceVersion".to_string(),
333 Value::String(resource_version.to_string()),
334 );
335 metadata.insert("generation".to_string(), Value::Number(generation.into()));
336 metadata.insert(
337 "creationTimestamp".to_string(),
338 Value::String(existing.created_at.to_rfc3339()),
339 );
340 }
341
342 let object_json = serde_json::to_string(&object)?;
343
344 sqlx::query(
345 r#"
346 UPDATE resources
347 SET object_json = ?, resource_version = ?, generation = ?, updated_at = ?
348 WHERE namespace = ? AND kind = ? AND name = ?
349 "#,
350 )
351 .bind(&object_json)
352 .bind(resource_version)
353 .bind(generation)
354 .bind(now.to_rfc3339())
355 .bind(namespace)
356 .bind(kind)
357 .bind(name)
358 .execute(&self.pool)
359 .await
360 .context("Failed to update resource")?;
361
362 let stored = StoredObject {
363 namespace: namespace.to_string(),
364 kind: kind.to_string(),
365 name: name.to_string(),
366 object: object.clone(),
367 uid: existing.uid,
368 resource_version,
369 generation,
370 created_at: existing.created_at,
371 updated_at: now,
372 };
373
374 let event = WatchEvent {
375 event_type: WatchEventType::Modified,
376 object,
377 rev: resource_version.to_string(),
378 };
379
380 Ok((stored, event))
381 }
382
383 pub async fn delete(
385 &self,
386 namespace: &str,
387 kind: &str,
388 name: &str,
389 ) -> Result<Option<WatchEvent>> {
390 let existing = self.get(namespace, kind, name).await?;
391
392 if existing.is_none() {
393 return Ok(None);
394 }
395
396 let existing = existing.unwrap();
397 let resource_version = self.next_revision().await?;
398
399 sqlx::query(
400 r#"
401 DELETE FROM resources
402 WHERE namespace = ? AND kind = ? AND name = ?
403 "#,
404 )
405 .bind(namespace)
406 .bind(kind)
407 .bind(name)
408 .execute(&self.pool)
409 .await
410 .context("Failed to delete resource")?;
411
412 Ok(Some(WatchEvent {
413 event_type: WatchEventType::Deleted,
414 object: existing.object,
415 rev: resource_version.to_string(),
416 }))
417 }
418
419 pub async fn create_namespace(&self, name: &str) -> Result<NamespaceInfo> {
421 let now = Utc::now();
422 let uid = Uuid::new_v4().to_string();
423 let resource_version = self.next_revision().await?;
424
425 sqlx::query(
426 r#"
427 INSERT INTO namespaces (name, uid, resource_version, created_at)
428 VALUES (?, ?, ?, ?)
429 "#,
430 )
431 .bind(name)
432 .bind(&uid)
433 .bind(resource_version)
434 .bind(now.to_rfc3339())
435 .execute(&self.pool)
436 .await
437 .context("Failed to create namespace")?;
438
439 Ok(NamespaceInfo {
440 name: name.to_string(),
441 uid,
442 resource_version,
443 created_at: now,
444 })
445 }
446
447 pub async fn ensure_namespace(&self, name: &str) -> Result<NamespaceInfo> {
449 let now = Utc::now();
450 let uid = Uuid::new_v4().to_string();
451 let resource_version = self.next_revision().await?;
452
453 sqlx::query(
455 r#"
456 INSERT OR IGNORE INTO namespaces (name, uid, resource_version, created_at)
457 VALUES (?, ?, ?, ?)
458 "#,
459 )
460 .bind(name)
461 .bind(&uid)
462 .bind(resource_version)
463 .bind(now.to_rfc3339())
464 .execute(&self.pool)
465 .await
466 .context("Failed to ensure namespace")?;
467
468 self.get_namespace(name)
470 .await?
471 .ok_or_else(|| anyhow::anyhow!("Namespace should exist after ensure"))
472 }
473
474 pub async fn get_namespace(&self, name: &str) -> Result<Option<NamespaceInfo>> {
476 let row: Option<(String, String, i64, String)> = sqlx::query_as(
477 r#"
478 SELECT name, uid, resource_version, created_at
479 FROM namespaces
480 WHERE name = ?
481 "#,
482 )
483 .bind(name)
484 .fetch_optional(&self.pool)
485 .await?;
486
487 match row {
488 Some((name, uid, rv, created)) => Ok(Some(NamespaceInfo {
489 name,
490 uid,
491 resource_version: rv,
492 created_at: chrono::DateTime::parse_from_rfc3339(&created)?.with_timezone(&Utc),
493 })),
494 None => Ok(None),
495 }
496 }
497
498 pub async fn list_namespaces(&self) -> Result<Vec<NamespaceInfo>> {
500 let rows: Vec<(String, String, i64, String)> = sqlx::query_as(
501 r#"
502 SELECT name, uid, resource_version, created_at
503 FROM namespaces
504 ORDER BY name
505 "#,
506 )
507 .fetch_all(&self.pool)
508 .await?;
509
510 rows.into_iter()
511 .map(|(name, uid, rv, created)| {
512 Ok(NamespaceInfo {
513 name,
514 uid,
515 resource_version: rv,
516 created_at: chrono::DateTime::parse_from_rfc3339(&created)?.with_timezone(&Utc),
517 })
518 })
519 .collect()
520 }
521
522 pub async fn delete_namespace(&self, name: &str) -> Result<Option<i64>> {
524 let existing = self.get_namespace(name).await?;
526 if existing.is_none() {
527 return Ok(None);
528 }
529
530 let resource_version = self.next_revision().await?;
531
532 sqlx::query(
534 r#"
535 DELETE FROM resources
536 WHERE namespace = ?
537 "#,
538 )
539 .bind(name)
540 .execute(&self.pool)
541 .await
542 .context("Failed to delete resources in namespace")?;
543
544 sqlx::query(
546 r#"
547 DELETE FROM namespaces
548 WHERE name = ?
549 "#,
550 )
551 .bind(name)
552 .execute(&self.pool)
553 .await
554 .context("Failed to delete namespace")?;
555
556 Ok(Some(resource_version))
557 }
558
559 pub async fn namespace_exists(&self, name: &str) -> Result<bool> {
561 let row: Option<(i32,)> = sqlx::query_as(
562 r#"
563 SELECT 1 FROM namespaces WHERE name = ?
564 "#,
565 )
566 .bind(name)
567 .fetch_optional(&self.pool)
568 .await?;
569
570 Ok(row.is_some())
571 }
572}
573
574#[derive(Debug, Clone)]
576pub struct NamespaceInfo {
577 pub name: String,
578 pub uid: String,
579 pub resource_version: i64,
580 pub created_at: chrono::DateTime<Utc>,
581}