Skip to main content

cfgd_core/state/
sources.rs

1use rusqlite::params;
2
3use super::StateStore;
4use super::types::{ConfigSourceRecord, SourceConfigHash};
5use crate::errors::{Result, StateError};
6
7impl StateStore {
8    /// Upsert a config source record.
9    pub fn upsert_config_source(
10        &self,
11        name: &str,
12        origin_url: &str,
13        origin_branch: &str,
14        last_commit: Option<&str>,
15        source_version: Option<&str>,
16        pinned_version: Option<&str>,
17    ) -> Result<i64> {
18        let timestamp = crate::utc_now_iso8601();
19        self.conn
20            .execute(
21                "INSERT INTO config_sources (name, origin_url, origin_branch, last_fetched, last_commit, source_version, pinned_version)
22                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
23                 ON CONFLICT(name) DO UPDATE SET
24                    origin_url = excluded.origin_url,
25                    origin_branch = excluded.origin_branch,
26                    last_fetched = excluded.last_fetched,
27                    last_commit = excluded.last_commit,
28                    source_version = excluded.source_version,
29                    pinned_version = excluded.pinned_version",
30                params![name, origin_url, origin_branch, timestamp, last_commit, source_version, pinned_version],
31            )
32            ?;
33        Ok(self.conn.last_insert_rowid())
34    }
35
36    /// Get all config sources.
37    pub fn config_sources(&self) -> Result<Vec<ConfigSourceRecord>> {
38        let mut stmt = self
39            .conn
40            .prepare(
41                "SELECT id, name, origin_url, origin_branch, last_fetched, last_commit, source_version, pinned_version, status
42                 FROM config_sources ORDER BY name",
43            )
44            ?;
45
46        let sources = stmt
47            .query_map([], |row| {
48                Ok(ConfigSourceRecord {
49                    id: row.get(0)?,
50                    name: row.get(1)?,
51                    origin_url: row.get(2)?,
52                    origin_branch: row.get(3)?,
53                    last_fetched: row.get(4)?,
54                    last_commit: row.get(5)?,
55                    source_version: row.get(6)?,
56                    pinned_version: row.get(7)?,
57                    status: row.get(8)?,
58                })
59            })?
60            .collect::<std::result::Result<Vec<_>, _>>()?;
61
62        Ok(sources)
63    }
64
65    /// Get a config source by name.
66    pub fn config_source_by_name(&self, name: &str) -> Result<Option<ConfigSourceRecord>> {
67        let result = self.conn.query_row(
68            "SELECT id, name, origin_url, origin_branch, last_fetched, last_commit, source_version, pinned_version, status
69             FROM config_sources WHERE name = ?1",
70            params![name],
71            |row| {
72                Ok(ConfigSourceRecord {
73                    id: row.get(0)?,
74                    name: row.get(1)?,
75                    origin_url: row.get(2)?,
76                    origin_branch: row.get(3)?,
77                    last_fetched: row.get(4)?,
78                    last_commit: row.get(5)?,
79                    source_version: row.get(6)?,
80                    pinned_version: row.get(7)?,
81                    status: row.get(8)?,
82                })
83            },
84        );
85
86        match result {
87            Ok(record) => Ok(Some(record)),
88            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
89            Err(e) => Err(StateError::Database(e.to_string()).into()),
90        }
91    }
92
93    /// Remove a config source from state.
94    pub fn remove_config_source(&self, name: &str) -> Result<()> {
95        self.conn
96            .execute("DELETE FROM config_sources WHERE name = ?1", params![name])?;
97        Ok(())
98    }
99
100    /// Update the status of a config source.
101    pub fn update_config_source_status(&self, name: &str, status: &str) -> Result<()> {
102        self.conn.execute(
103            "UPDATE config_sources SET status = ?1 WHERE name = ?2",
104            params![status, name],
105        )?;
106        Ok(())
107    }
108
109    /// Record a source apply (links a source's commit to an apply).
110    pub fn record_source_apply(
111        &self,
112        source_name: &str,
113        apply_id: i64,
114        source_commit: &str,
115    ) -> Result<()> {
116        let source = self.config_source_by_name(source_name)?;
117        if let Some(src) = source {
118            self.conn.execute(
119                "INSERT INTO source_applies (source_id, apply_id, source_commit)
120                     VALUES (?1, ?2, ?3)",
121                params![src.id, apply_id, source_commit],
122            )?;
123        }
124        Ok(())
125    }
126
127    /// Record a composition conflict.
128    pub fn record_source_conflict(
129        &self,
130        source_name: &str,
131        resource_type: &str,
132        resource_id: &str,
133        resolution: &str,
134        detail: Option<&str>,
135    ) -> Result<()> {
136        let timestamp = crate::utc_now_iso8601();
137        self.conn
138            .execute(
139                "INSERT INTO source_conflicts (timestamp, source_name, resource_type, resource_id, resolution, detail)
140                 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
141                params![timestamp, source_name, resource_type, resource_id, resolution, detail],
142            )
143            ?;
144        Ok(())
145    }
146
147    /// Get the stored config hash for a source.
148    pub fn source_config_hash(&self, source: &str) -> Result<Option<SourceConfigHash>> {
149        let result = self.conn.query_row(
150            "SELECT source, config_hash, merged_at FROM source_config_hashes WHERE source = ?1",
151            params![source],
152            |row| {
153                Ok(SourceConfigHash {
154                    source: row.get(0)?,
155                    config_hash: row.get(1)?,
156                    merged_at: row.get(2)?,
157                })
158            },
159        );
160
161        match result {
162            Ok(record) => Ok(Some(record)),
163            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
164            Err(e) => Err(StateError::Database(e.to_string()).into()),
165        }
166    }
167
168    /// Upsert a source config hash.
169    pub fn set_source_config_hash(&self, source: &str, config_hash: &str) -> Result<()> {
170        let timestamp = crate::utc_now_iso8601();
171        self.conn.execute(
172            "INSERT INTO source_config_hashes (source, config_hash, merged_at)
173                 VALUES (?1, ?2, ?3)
174                 ON CONFLICT(source) DO UPDATE SET
175                    config_hash = excluded.config_hash,
176                    merged_at = excluded.merged_at",
177            params![source, config_hash, timestamp],
178        )?;
179        Ok(())
180    }
181
182    /// Remove the config hash for a source.
183    pub fn remove_source_config_hash(&self, source: &str) -> Result<()> {
184        self.conn.execute(
185            "DELETE FROM source_config_hashes WHERE source = ?1",
186            params![source],
187        )?;
188        Ok(())
189    }
190}