Skip to main content

cloudiful_config/
sql.rs

1use std::io;
2
3use postgres::Client;
4use rusqlite::{params, Connection, OptionalExtension};
5
6use crate::format;
7use crate::ConfigSource;
8
9pub const DEFAULT_CONFIG_TABLE: &str = "app_configs";
10
11pub struct SqliteConfigStore<'conn> {
12    conn: &'conn Connection,
13    table_name: String,
14    app_name: String,
15}
16
17impl<'conn> SqliteConfigStore<'conn> {
18    pub fn new(conn: &'conn Connection, app_name: &str, table_name: Option<&str>) -> Self {
19        Self {
20            conn,
21            table_name: table_name.unwrap_or(DEFAULT_CONFIG_TABLE).to_string(),
22            app_name: app_name.to_string(),
23        }
24    }
25
26    fn ensure_schema(&self) -> io::Result<()> {
27        self.validate_identifier(&self.table_name)?;
28        self.conn
29            .execute(
30                &format!(
31                    "CREATE TABLE IF NOT EXISTS {} (
32                        app_name TEXT PRIMARY KEY,
33                        config_json TEXT NOT NULL,
34                        updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
35                    )",
36                    self.table_name
37                ),
38                [],
39            )
40            .map_err(to_io_error_sqlite)?;
41        self.detect_conflict()?;
42        Ok(())
43    }
44
45    fn detect_conflict(&self) -> io::Result<()> {
46        let pragma = format!("PRAGMA table_info({})", self.table_name);
47        let mut stmt = self.conn.prepare(&pragma).map_err(to_io_error_sqlite)?;
48        let mut rows = stmt.query([]).map_err(to_io_error_sqlite)?;
49
50        let mut has_app_name = false;
51        let mut has_config_json = false;
52
53        while let Some(row) = rows.next().map_err(to_io_error_sqlite)? {
54            let name: String = row.get(1).map_err(to_io_error_sqlite)?;
55            let pk: i64 = row.get(5).map_err(to_io_error_sqlite)?;
56            if name == "app_name" && pk == 1 {
57                has_app_name = true;
58            }
59            if name == "config_json" {
60                has_config_json = true;
61            }
62        }
63
64        if has_app_name && has_config_json {
65            Ok(())
66        } else {
67            Err(io::Error::new(
68                io::ErrorKind::AlreadyExists,
69                format!(
70                    "refusing to use sqlite table {} because it does not match the expected config schema",
71                    self.table_name
72                ),
73            ))
74        }
75    }
76
77    fn validate_identifier(&self, ident: &str) -> io::Result<()> {
78        if ident.is_empty()
79            || !ident
80                .chars()
81                .all(|ch| ch.is_ascii_alphanumeric() || ch == '_')
82        {
83            return Err(io::Error::new(
84                io::ErrorKind::InvalidInput,
85                format!("invalid sqlite config table name {ident}"),
86            ));
87        }
88        Ok(())
89    }
90}
91
92impl ConfigSource for SqliteConfigStore<'_> {
93    fn source_name(&self) -> String {
94        format!("sqlite:{}:{}", self.table_name, self.app_name)
95    }
96
97    fn read_value(&mut self) -> io::Result<Option<serde_json::Value>> {
98        self.ensure_schema()?;
99        let query = format!(
100            "SELECT config_json FROM {} WHERE app_name = ?1",
101            self.table_name
102        );
103        let mut stmt = self.conn.prepare(&query).map_err(to_io_error_sqlite)?;
104        let value: Option<String> = stmt
105            .query_row([&self.app_name], |row| row.get(0))
106            .optional()
107            .map_err(to_io_error_sqlite)?;
108
109        match value {
110            Some(raw) => {
111                format::parse_config_value(&raw, format::ConfigFormat::Json, &self.source_name())
112                    .map(Some)
113            }
114            None => Ok(None),
115        }
116    }
117
118    fn write_config<T>(&mut self, config: &T) -> io::Result<()>
119    where
120        T: serde::Serialize,
121    {
122        self.ensure_schema()?;
123        let raw = format::serialize_config(config, format::ConfigFormat::Json, &self.source_name())?;
124        let query = format!(
125            "INSERT INTO {} (app_name, config_json) VALUES (?1, ?2)
126             ON CONFLICT(app_name) DO UPDATE
127             SET config_json = excluded.config_json, updated_at = CURRENT_TIMESTAMP",
128            self.table_name
129        );
130        self.conn
131            .execute(&query, params![self.app_name, raw])
132            .map_err(to_io_error_sqlite)?;
133        Ok(())
134    }
135}
136
137pub struct PostgresConfigStore<'client> {
138    client: &'client mut Client,
139    table_name: String,
140    app_name: String,
141}
142
143impl<'client> PostgresConfigStore<'client> {
144    pub fn new(client: &'client mut Client, app_name: &str, table_name: Option<&str>) -> Self {
145        Self {
146            client,
147            table_name: table_name.unwrap_or(DEFAULT_CONFIG_TABLE).to_string(),
148            app_name: app_name.to_string(),
149        }
150    }
151
152    fn ensure_schema(&mut self) -> io::Result<()> {
153        self.validate_identifier(&self.table_name)?;
154        self.client
155            .batch_execute(&format!(
156                "CREATE TABLE IF NOT EXISTS {} (
157                    app_name TEXT PRIMARY KEY,
158                    config_json TEXT NOT NULL,
159                    updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
160                )",
161                self.table_name
162            ))
163            .map_err(to_io_error_postgres)?;
164        self.detect_conflict()?;
165        Ok(())
166    }
167
168    fn detect_conflict(&mut self) -> io::Result<()> {
169        let row = self
170            .client
171            .query_one(
172                "SELECT
173                    EXISTS (
174                        SELECT 1
175                        FROM information_schema.columns
176                        WHERE table_name = $1 AND column_name = 'app_name'
177                    ),
178                    EXISTS (
179                        SELECT 1
180                        FROM information_schema.columns
181                        WHERE table_name = $1 AND column_name = 'config_json'
182                    )",
183                &[&self.table_name],
184            )
185            .map_err(to_io_error_postgres)?;
186
187        let has_app_name: bool = row.get(0);
188        let has_config_json: bool = row.get(1);
189
190        if has_app_name && has_config_json {
191            Ok(())
192        } else {
193            Err(io::Error::new(
194                io::ErrorKind::AlreadyExists,
195                format!(
196                    "refusing to use postgres table {} because it does not match the expected config schema",
197                    self.table_name
198                ),
199            ))
200        }
201    }
202
203    fn validate_identifier(&self, ident: &str) -> io::Result<()> {
204        if ident.is_empty()
205            || !ident
206                .chars()
207                .all(|ch| ch.is_ascii_alphanumeric() || ch == '_')
208        {
209            return Err(io::Error::new(
210                io::ErrorKind::InvalidInput,
211                format!("invalid postgres config table name {ident}"),
212            ));
213        }
214        Ok(())
215    }
216}
217
218impl ConfigSource for PostgresConfigStore<'_> {
219    fn source_name(&self) -> String {
220        format!("postgres:{}:{}", self.table_name, self.app_name)
221    }
222
223    fn read_value(&mut self) -> io::Result<Option<serde_json::Value>> {
224        self.ensure_schema()?;
225        let query = format!(
226            "SELECT config_json FROM {} WHERE app_name = $1",
227            self.table_name
228        );
229        let row = self
230            .client
231            .query_opt(&query, &[&self.app_name])
232            .map_err(to_io_error_postgres)?;
233
234        match row {
235            Some(row) => {
236                let raw: String = row.get(0);
237                format::parse_config_value(&raw, format::ConfigFormat::Json, &self.source_name())
238                    .map(Some)
239            }
240            None => Ok(None),
241        }
242    }
243
244    fn write_config<T>(&mut self, config: &T) -> io::Result<()>
245    where
246        T: serde::Serialize,
247    {
248        self.ensure_schema()?;
249        let raw = format::serialize_config(config, format::ConfigFormat::Json, &self.source_name())?;
250        let query = format!(
251            "INSERT INTO {} (app_name, config_json) VALUES ($1, $2)
252             ON CONFLICT (app_name) DO UPDATE
253             SET config_json = EXCLUDED.config_json, updated_at = NOW()",
254            self.table_name
255        );
256        self.client
257            .execute(&query, &[&self.app_name, &raw])
258            .map_err(to_io_error_postgres)?;
259        Ok(())
260    }
261}
262
263fn to_io_error_sqlite(err: rusqlite::Error) -> io::Error {
264    io::Error::other(err)
265}
266
267fn to_io_error_postgres(err: postgres::Error) -> io::Error {
268    io::Error::other(err)
269}
270
271pub fn sqlite_store<'conn>(conn: &'conn Connection, app_name: &str) -> SqliteConfigStore<'conn> {
272    SqliteConfigStore::new(conn, app_name, None)
273}
274
275pub fn sqlite_store_with_table<'conn>(
276    conn: &'conn Connection,
277    app_name: &str,
278    table_name: &str,
279) -> SqliteConfigStore<'conn> {
280    SqliteConfigStore::new(conn, app_name, Some(table_name))
281}
282
283pub fn postgres_store<'client>(
284    client: &'client mut Client,
285    app_name: &str,
286) -> PostgresConfigStore<'client> {
287    PostgresConfigStore::new(client, app_name, None)
288}
289
290pub fn postgres_store_with_table<'client>(
291    client: &'client mut Client,
292    app_name: &str,
293    table_name: &str,
294) -> PostgresConfigStore<'client> {
295    PostgresConfigStore::new(client, app_name, Some(table_name))
296}