Skip to main content

planspec_server/storage/
sqlite.rs

1use anyhow::{Context, Result};
2use chrono::Utc;
3use serde_json::Value;
4use sqlx::{sqlite::SqlitePoolOptions, Pool, Sqlite};
5use uuid::Uuid;
6
7use planspec_core::{WatchEvent, WatchEventType};
8
9use super::types::StoredObject;
10
11/// Type alias for the database row tuple to reduce complexity warnings
12type DbRow = (
13    String,
14    String,
15    String,
16    String,
17    String,
18    i64,
19    i64,
20    String,
21    String,
22);
23
24/// SQLite-backed storage for PlanSpec resources
25#[derive(Clone)]
26pub struct Store {
27    pool: Pool<Sqlite>,
28}
29
30impl Store {
31    /// Create a new store, initializing the database schema
32    pub async fn new(path: &str) -> Result<Self> {
33        let url = format!("sqlite:{}?mode=rwc", path);
34        let pool = SqlitePoolOptions::new()
35            .max_connections(5)
36            .connect(&url)
37            .await
38            .context("Failed to connect to SQLite database")?;
39
40        // Initialize schema
41        sqlx::query(
42            r#"
43            CREATE TABLE IF NOT EXISTS resources (
44                namespace TEXT NOT NULL,
45                kind TEXT NOT NULL,
46                name TEXT NOT NULL,
47                object_json TEXT NOT NULL,
48                uid TEXT NOT NULL,
49                resource_version INTEGER NOT NULL,
50                generation INTEGER NOT NULL,
51                created_at TEXT NOT NULL,
52                updated_at TEXT NOT NULL,
53                PRIMARY KEY (namespace, kind, name)
54            );
55
56            CREATE INDEX IF NOT EXISTS idx_resources_kind ON resources(kind);
57            CREATE INDEX IF NOT EXISTS idx_resources_ns_kind ON resources(namespace, kind);
58            "#,
59        )
60        .execute(&pool)
61        .await
62        .context("Failed to initialize database schema")?;
63
64        // Create revision counter table
65        sqlx::query(
66            r#"
67            CREATE TABLE IF NOT EXISTS revision (
68                id INTEGER PRIMARY KEY CHECK (id = 1),
69                current INTEGER NOT NULL DEFAULT 0
70            );
71            INSERT OR IGNORE INTO revision (id, current) VALUES (1, 0);
72            "#,
73        )
74        .execute(&pool)
75        .await
76        .context("Failed to initialize revision counter")?;
77
78        // Create namespaces table
79        sqlx::query(
80            r#"
81            CREATE TABLE IF NOT EXISTS namespaces (
82                name TEXT PRIMARY KEY,
83                uid TEXT NOT NULL,
84                resource_version INTEGER NOT NULL,
85                created_at TEXT NOT NULL
86            );
87            "#,
88        )
89        .execute(&pool)
90        .await
91        .context("Failed to initialize namespaces table")?;
92
93        let store = Self { pool };
94
95        // Ensure "default" namespace always exists
96        if !store.namespace_exists("default").await? {
97            store.create_namespace("default").await?;
98        }
99
100        Ok(store)
101    }
102
103    /// Get the next revision number (atomically increments)
104    async fn next_revision(&self) -> Result<i64> {
105        let row: (i64,) = sqlx::query_as(
106            "UPDATE revision SET current = current + 1 WHERE id = 1 RETURNING current",
107        )
108        .fetch_one(&self.pool)
109        .await?;
110        Ok(row.0)
111    }
112
113    /// Create a new resource
114    pub async fn create(
115        &self,
116        namespace: &str,
117        kind: &str,
118        name: &str,
119        mut object: Value,
120    ) -> Result<(StoredObject, WatchEvent)> {
121        let now = Utc::now();
122        let uid = Uuid::new_v4().to_string();
123        let resource_version = self.next_revision().await?;
124        let generation = 1i64;
125
126        // Inject server-managed metadata
127        if let Some(metadata) = object.get_mut("metadata").and_then(|m| m.as_object_mut()) {
128            metadata.insert("uid".to_string(), Value::String(uid.clone()));
129            metadata.insert(
130                "resourceVersion".to_string(),
131                Value::String(resource_version.to_string()),
132            );
133            metadata.insert("generation".to_string(), Value::Number(generation.into()));
134            metadata.insert(
135                "creationTimestamp".to_string(),
136                Value::String(now.to_rfc3339()),
137            );
138        }
139
140        let object_json = serde_json::to_string(&object)?;
141
142        sqlx::query(
143            r#"
144            INSERT INTO resources (namespace, kind, name, object_json, uid, resource_version, generation, created_at, updated_at)
145            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
146            "#,
147        )
148        .bind(namespace)
149        .bind(kind)
150        .bind(name)
151        .bind(&object_json)
152        .bind(&uid)
153        .bind(resource_version)
154        .bind(generation)
155        .bind(now.to_rfc3339())
156        .bind(now.to_rfc3339())
157        .execute(&self.pool)
158        .await
159        .context("Failed to insert resource")?;
160
161        let stored = StoredObject {
162            namespace: namespace.to_string(),
163            kind: kind.to_string(),
164            name: name.to_string(),
165            object: object.clone(),
166            uid,
167            resource_version,
168            generation,
169            created_at: now,
170            updated_at: now,
171        };
172
173        let event = WatchEvent {
174            event_type: WatchEventType::Added,
175            object,
176            rev: resource_version.to_string(),
177        };
178
179        Ok((stored, event))
180    }
181
182    /// Get a resource by key
183    pub async fn get(
184        &self,
185        namespace: &str,
186        kind: &str,
187        name: &str,
188    ) -> Result<Option<StoredObject>> {
189        let row: Option<DbRow> =
190            sqlx::query_as(
191                r#"
192                SELECT namespace, kind, name, object_json, uid, resource_version, generation, created_at, updated_at
193                FROM resources
194                WHERE namespace = ? AND kind = ? AND name = ?
195                "#,
196            )
197            .bind(namespace)
198            .bind(kind)
199            .bind(name)
200            .fetch_optional(&self.pool)
201            .await?;
202
203        match row {
204            Some((ns, k, n, obj_json, uid, rv, gen, created, updated)) => {
205                let object: Value = serde_json::from_str(&obj_json)?;
206                Ok(Some(StoredObject {
207                    namespace: ns,
208                    kind: k,
209                    name: n,
210                    object,
211                    uid,
212                    resource_version: rv,
213                    generation: gen,
214                    created_at: chrono::DateTime::parse_from_rfc3339(&created)?.with_timezone(&Utc),
215                    updated_at: chrono::DateTime::parse_from_rfc3339(&updated)?.with_timezone(&Utc),
216                }))
217            }
218            None => Ok(None),
219        }
220    }
221
222    /// List resources by namespace and kind
223    pub async fn list(&self, namespace: &str, kind: &str) -> Result<Vec<StoredObject>> {
224        let rows: Vec<DbRow> =
225            sqlx::query_as(
226                r#"
227                SELECT namespace, kind, name, object_json, uid, resource_version, generation, created_at, updated_at
228                FROM resources
229                WHERE namespace = ? AND kind = ?
230                ORDER BY name
231                "#,
232            )
233            .bind(namespace)
234            .bind(kind)
235            .fetch_all(&self.pool)
236            .await?;
237
238        rows.into_iter()
239            .map(|(ns, k, n, obj_json, uid, rv, gen, created, updated)| {
240                let object: Value = serde_json::from_str(&obj_json)?;
241                Ok(StoredObject {
242                    namespace: ns,
243                    kind: k,
244                    name: n,
245                    object,
246                    uid,
247                    resource_version: rv,
248                    generation: gen,
249                    created_at: chrono::DateTime::parse_from_rfc3339(&created)?.with_timezone(&Utc),
250                    updated_at: chrono::DateTime::parse_from_rfc3339(&updated)?.with_timezone(&Utc),
251                })
252            })
253            .collect()
254    }
255
256    /// List all resources of a kind across all namespaces
257    pub async fn list_all(&self, kind: &str) -> Result<Vec<StoredObject>> {
258        let rows: Vec<DbRow> =
259            sqlx::query_as(
260                r#"
261                SELECT namespace, kind, name, object_json, uid, resource_version, generation, created_at, updated_at
262                FROM resources
263                WHERE kind = ?
264                ORDER BY namespace, name
265                "#,
266            )
267            .bind(kind)
268            .fetch_all(&self.pool)
269            .await?;
270
271        rows.into_iter()
272            .map(|(ns, k, n, obj_json, uid, rv, gen, created, updated)| {
273                let object: Value = serde_json::from_str(&obj_json)?;
274                Ok(StoredObject {
275                    namespace: ns,
276                    kind: k,
277                    name: n,
278                    object,
279                    uid,
280                    resource_version: rv,
281                    generation: gen,
282                    created_at: chrono::DateTime::parse_from_rfc3339(&created)?.with_timezone(&Utc),
283                    updated_at: chrono::DateTime::parse_from_rfc3339(&updated)?.with_timezone(&Utc),
284                })
285            })
286            .collect()
287    }
288
289    /// Replace a resource (with optimistic concurrency check)
290    pub async fn replace(
291        &self,
292        namespace: &str,
293        kind: &str,
294        name: &str,
295        mut object: Value,
296        expected_resource_version: Option<&str>,
297    ) -> Result<(StoredObject, WatchEvent)> {
298        // Get existing resource
299        let existing = self
300            .get(namespace, kind, name)
301            .await?
302            .ok_or_else(|| anyhow::anyhow!("Resource not found"))?;
303
304        // Check resourceVersion if provided
305        if let Some(expected_rv) = expected_resource_version {
306            let expected: i64 = expected_rv.parse()?;
307            if existing.resource_version != expected {
308                anyhow::bail!(
309                    "Conflict: resourceVersion mismatch (expected {}, got {})",
310                    expected_rv,
311                    existing.resource_version
312                );
313            }
314        }
315
316        let now = Utc::now();
317        let resource_version = self.next_revision().await?;
318
319        // Check if spec changed to determine generation increment
320        let old_spec = existing.object.get("spec");
321        let new_spec = object.get("spec");
322        let generation = if old_spec != new_spec {
323            existing.generation + 1
324        } else {
325            existing.generation
326        };
327
328        // Preserve immutable metadata, update mutable parts
329        if let Some(metadata) = object.get_mut("metadata").and_then(|m| m.as_object_mut()) {
330            metadata.insert("uid".to_string(), Value::String(existing.uid.clone()));
331            metadata.insert(
332                "resourceVersion".to_string(),
333                Value::String(resource_version.to_string()),
334            );
335            metadata.insert("generation".to_string(), Value::Number(generation.into()));
336            metadata.insert(
337                "creationTimestamp".to_string(),
338                Value::String(existing.created_at.to_rfc3339()),
339            );
340        }
341
342        let object_json = serde_json::to_string(&object)?;
343
344        sqlx::query(
345            r#"
346            UPDATE resources
347            SET object_json = ?, resource_version = ?, generation = ?, updated_at = ?
348            WHERE namespace = ? AND kind = ? AND name = ?
349            "#,
350        )
351        .bind(&object_json)
352        .bind(resource_version)
353        .bind(generation)
354        .bind(now.to_rfc3339())
355        .bind(namespace)
356        .bind(kind)
357        .bind(name)
358        .execute(&self.pool)
359        .await
360        .context("Failed to update resource")?;
361
362        let stored = StoredObject {
363            namespace: namespace.to_string(),
364            kind: kind.to_string(),
365            name: name.to_string(),
366            object: object.clone(),
367            uid: existing.uid,
368            resource_version,
369            generation,
370            created_at: existing.created_at,
371            updated_at: now,
372        };
373
374        let event = WatchEvent {
375            event_type: WatchEventType::Modified,
376            object,
377            rev: resource_version.to_string(),
378        };
379
380        Ok((stored, event))
381    }
382
383    /// Delete a resource
384    pub async fn delete(
385        &self,
386        namespace: &str,
387        kind: &str,
388        name: &str,
389    ) -> Result<Option<WatchEvent>> {
390        let existing = self.get(namespace, kind, name).await?;
391
392        if existing.is_none() {
393            return Ok(None);
394        }
395
396        let existing = existing.unwrap();
397        let resource_version = self.next_revision().await?;
398
399        sqlx::query(
400            r#"
401            DELETE FROM resources
402            WHERE namespace = ? AND kind = ? AND name = ?
403            "#,
404        )
405        .bind(namespace)
406        .bind(kind)
407        .bind(name)
408        .execute(&self.pool)
409        .await
410        .context("Failed to delete resource")?;
411
412        Ok(Some(WatchEvent {
413            event_type: WatchEventType::Deleted,
414            object: existing.object,
415            rev: resource_version.to_string(),
416        }))
417    }
418
419    /// Create a namespace (fails if already exists)
420    pub async fn create_namespace(&self, name: &str) -> Result<NamespaceInfo> {
421        let now = Utc::now();
422        let uid = Uuid::new_v4().to_string();
423        let resource_version = self.next_revision().await?;
424
425        sqlx::query(
426            r#"
427            INSERT INTO namespaces (name, uid, resource_version, created_at)
428            VALUES (?, ?, ?, ?)
429            "#,
430        )
431        .bind(name)
432        .bind(&uid)
433        .bind(resource_version)
434        .bind(now.to_rfc3339())
435        .execute(&self.pool)
436        .await
437        .context("Failed to create namespace")?;
438
439        Ok(NamespaceInfo {
440            name: name.to_string(),
441            uid,
442            resource_version,
443            created_at: now,
444        })
445    }
446
447    /// Ensure a namespace exists, creating it if needed (idempotent, race-safe)
448    pub async fn ensure_namespace(&self, name: &str) -> Result<NamespaceInfo> {
449        let now = Utc::now();
450        let uid = Uuid::new_v4().to_string();
451        let resource_version = self.next_revision().await?;
452
453        // Use INSERT OR IGNORE to handle concurrent creation attempts
454        sqlx::query(
455            r#"
456            INSERT OR IGNORE INTO namespaces (name, uid, resource_version, created_at)
457            VALUES (?, ?, ?, ?)
458            "#,
459        )
460        .bind(name)
461        .bind(&uid)
462        .bind(resource_version)
463        .bind(now.to_rfc3339())
464        .execute(&self.pool)
465        .await
466        .context("Failed to ensure namespace")?;
467
468        // Return the namespace (either just created or already existing)
469        self.get_namespace(name)
470            .await?
471            .ok_or_else(|| anyhow::anyhow!("Namespace should exist after ensure"))
472    }
473
474    /// Get a namespace by name
475    pub async fn get_namespace(&self, name: &str) -> Result<Option<NamespaceInfo>> {
476        let row: Option<(String, String, i64, String)> = sqlx::query_as(
477            r#"
478            SELECT name, uid, resource_version, created_at
479            FROM namespaces
480            WHERE name = ?
481            "#,
482        )
483        .bind(name)
484        .fetch_optional(&self.pool)
485        .await?;
486
487        match row {
488            Some((name, uid, rv, created)) => Ok(Some(NamespaceInfo {
489                name,
490                uid,
491                resource_version: rv,
492                created_at: chrono::DateTime::parse_from_rfc3339(&created)?.with_timezone(&Utc),
493            })),
494            None => Ok(None),
495        }
496    }
497
498    /// List all namespaces
499    pub async fn list_namespaces(&self) -> Result<Vec<NamespaceInfo>> {
500        let rows: Vec<(String, String, i64, String)> = sqlx::query_as(
501            r#"
502            SELECT name, uid, resource_version, created_at
503            FROM namespaces
504            ORDER BY name
505            "#,
506        )
507        .fetch_all(&self.pool)
508        .await?;
509
510        rows.into_iter()
511            .map(|(name, uid, rv, created)| {
512                Ok(NamespaceInfo {
513                    name,
514                    uid,
515                    resource_version: rv,
516                    created_at: chrono::DateTime::parse_from_rfc3339(&created)?.with_timezone(&Utc),
517                })
518            })
519            .collect()
520    }
521
522    /// Delete a namespace and all its resources (cascade delete)
523    pub async fn delete_namespace(&self, name: &str) -> Result<Option<i64>> {
524        // Check if namespace exists
525        let existing = self.get_namespace(name).await?;
526        if existing.is_none() {
527            return Ok(None);
528        }
529
530        let resource_version = self.next_revision().await?;
531
532        // Delete all resources in the namespace
533        sqlx::query(
534            r#"
535            DELETE FROM resources
536            WHERE namespace = ?
537            "#,
538        )
539        .bind(name)
540        .execute(&self.pool)
541        .await
542        .context("Failed to delete resources in namespace")?;
543
544        // Delete the namespace
545        sqlx::query(
546            r#"
547            DELETE FROM namespaces
548            WHERE name = ?
549            "#,
550        )
551        .bind(name)
552        .execute(&self.pool)
553        .await
554        .context("Failed to delete namespace")?;
555
556        Ok(Some(resource_version))
557    }
558
559    /// Check if a namespace exists
560    pub async fn namespace_exists(&self, name: &str) -> Result<bool> {
561        let row: Option<(i32,)> = sqlx::query_as(
562            r#"
563            SELECT 1 FROM namespaces WHERE name = ?
564            "#,
565        )
566        .bind(name)
567        .fetch_optional(&self.pool)
568        .await?;
569
570        Ok(row.is_some())
571    }
572}
573
574/// Information about a namespace
575#[derive(Debug, Clone)]
576pub struct NamespaceInfo {
577    pub name: String,
578    pub uid: String,
579    pub resource_version: i64,
580    pub created_at: chrono::DateTime<Utc>,
581}