Skip to main content

cedros_data/store/
entries.rs

1use std::collections::BTreeMap;
2
3use sqlx::{Postgres, QueryBuilder, Row, Transaction};
4
5use crate::contracts::{infer_contract, merge_contracts, verify_contract_change};
6use crate::defaults::SITE_SCHEMA_NAME;
7use crate::error::{CedrosDataError, Result};
8use crate::models::{
9    Collection, CollectionMode, EntryRecord, QueryEntriesRequest, UpsertEntryRequest,
10};
11use crate::sql::quote_ident;
12
13use super::{
14    find_collection_with, insert_contract_in_transaction, latest_contract_with, CedrosData,
15};
16
17const MAX_QUERY_LIMIT: i64 = 1_000;
18const ENTRY_ORDER_CLAUSE: &str = " ORDER BY updated_at DESC, entry_key ASC";
19
20impl CedrosData {
21    pub async fn upsert_entry(&self, request: UpsertEntryRequest) -> Result<EntryRecord> {
22        self.ensure_site_exists().await?;
23        let mut tx = self.pool().begin().await?;
24        let record = self.upsert_entry_in_transaction(&mut tx, &request).await?;
25        tx.commit().await?;
26        Ok(record)
27    }
28
29    pub async fn query_entries(&self, request: QueryEntriesRequest) -> Result<Vec<EntryRecord>> {
30        validate_query_pagination(&request)?;
31        self.ensure_site_exists().await?;
32        let collection = self.find_collection(&request.collection_name).await?;
33
34        match collection.mode {
35            CollectionMode::Jsonb => {
36                query_jsonb_entries(self.pool(), &collection.collection_name, &request).await
37            }
38            CollectionMode::Typed => {
39                query_typed_entries(self.pool(), SITE_SCHEMA_NAME, &collection, &request).await
40            }
41        }
42    }
43
44    pub(super) async fn upsert_entry_in_transaction(
45        &self,
46        tx: &mut Transaction<'_, Postgres>,
47        request: &UpsertEntryRequest,
48    ) -> Result<EntryRecord> {
49        let collection = find_collection_with(&mut **tx, &request.collection_name).await?;
50        self.upsert_entry_for_collection_in_transaction(tx, &collection, request)
51            .await
52    }
53
54    pub(super) async fn upsert_entry_for_collection_in_transaction(
55        &self,
56        tx: &mut Transaction<'_, Postgres>,
57        collection: &Collection,
58        request: &UpsertEntryRequest,
59    ) -> Result<EntryRecord> {
60        if collection.collection_name != request.collection_name {
61            return Err(CedrosDataError::InvalidRequest(format!(
62                "entry request collection {} does not match loaded collection {}",
63                request.collection_name, collection.collection_name
64            )));
65        }
66        ensure_payload_object(&request.payload)?;
67
68        self.verify_upsert_contract_in_transaction(tx, collection, &request.payload)
69            .await?;
70
71        match entry_write_plan(&collection.mode) {
72            EntryWritePlan::JsonbOnly => {
73                upsert_jsonb_entry_with(&mut **tx, &collection.collection_name, request).await
74            }
75            EntryWritePlan::TypedThenJsonbMirror => {
76                let record = upsert_typed_entry_in_transaction(
77                    self,
78                    tx,
79                    SITE_SCHEMA_NAME,
80                    collection,
81                    request,
82                )
83                .await?;
84                upsert_jsonb_entry_with(&mut **tx, &collection.collection_name, request).await?;
85                Ok(record)
86            }
87        }
88    }
89
90    async fn verify_upsert_contract_in_transaction(
91        &self,
92        tx: &mut Transaction<'_, Postgres>,
93        collection: &Collection,
94        payload: &serde_json::Value,
95    ) -> Result<()> {
96        let incoming = infer_contract(std::slice::from_ref(payload))?;
97        let latest = match collection.strict_contract {
98            Some(_) => None,
99            None => latest_contract_with(&mut **tx, &collection.collection_name).await?,
100        };
101
102        let reference = collection.strict_contract.as_ref().or(latest.as_ref());
103        let report = verify_contract_change(reference, &incoming);
104        if !report.passes {
105            return Err(CedrosDataError::ContractVerificationFailed(format!(
106                "{} => {}",
107                collection.collection_name,
108                report.breaking_changes.join(", ")
109            )));
110        }
111
112        if collection.strict_contract.is_none() {
113            let merged = merge_contracts(latest.as_ref(), &incoming);
114            if should_record_contract(latest.as_ref(), &merged)? {
115                insert_contract_in_transaction(tx, &collection.collection_name, &merged).await?;
116            }
117        }
118
119        Ok(())
120    }
121}
122
123fn ensure_payload_object(payload: &serde_json::Value) -> Result<()> {
124    if payload.is_object() {
125        return Ok(());
126    }
127    Err(CedrosDataError::InvalidRequest(
128        "entry payload must be a JSON object".to_string(),
129    ))
130}
131
132fn should_record_contract(
133    latest: Option<&crate::models::ContractSchema>,
134    merged: &crate::models::ContractSchema,
135) -> Result<bool> {
136    let Some(latest) = latest else {
137        return Ok(true);
138    };
139    Ok(serde_json::to_value(latest)? != serde_json::to_value(merged)?)
140}
141
142async fn upsert_jsonb_entry_with<'e, E>(
143    executor: E,
144    collection_name: &str,
145    request: &UpsertEntryRequest,
146) -> Result<EntryRecord>
147where
148    E: sqlx::Executor<'e, Database = Postgres>,
149{
150    let row = sqlx::query(
151        "INSERT INTO entries (collection_name, entry_key, payload, updated_at)
152         VALUES ($1, $2, $3, NOW())
153         ON CONFLICT (collection_name, entry_key)
154         DO UPDATE SET payload = EXCLUDED.payload,
155                       updated_at = NOW()
156         RETURNING entry_key, payload, updated_at",
157    )
158    .bind(collection_name)
159    .bind(&request.entry_key)
160    .bind(&request.payload)
161    .fetch_one(executor)
162    .await?;
163
164    Ok(EntryRecord {
165        entry_key: row.get("entry_key"),
166        payload: row.get("payload"),
167        updated_at: row.get("updated_at"),
168    })
169}
170
171async fn upsert_typed_entry_in_transaction(
172    store: &CedrosData,
173    tx: &mut Transaction<'_, Postgres>,
174    schema_name: &str,
175    collection: &Collection,
176    request: &UpsertEntryRequest,
177) -> Result<EntryRecord> {
178    let table_name = collection.table_name.as_ref().ok_or_else(|| {
179        CedrosDataError::InvalidRequest("typed collection missing table_name".into())
180    })?;
181
182    let table_columns = store
183        .typed_table_columns_in_transaction(tx, schema_name, table_name)
184        .await?;
185    validate_typed_payload(&table_columns, &request.payload)?;
186
187    let typed_columns = table_columns
188        .keys()
189        .filter(|name| !is_system_column(name))
190        .cloned()
191        .collect::<Vec<String>>();
192
193    let statement = typed_upsert_statement(schema_name, table_name, &typed_columns)?;
194    let row = sqlx::query(&statement)
195        .bind(&request.entry_key)
196        .bind(&request.payload)
197        .fetch_one(&mut **tx)
198        .await?;
199
200    Ok(EntryRecord {
201        entry_key: row.get("entry_key"),
202        payload: merged_typed_payload(row.get("payload"), row.get("typed_row")),
203        updated_at: row.get("updated_at"),
204    })
205}
206
207async fn query_jsonb_entries(
208    pool: &sqlx::PgPool,
209    collection_name: &str,
210    request: &QueryEntriesRequest,
211) -> Result<Vec<EntryRecord>> {
212    let mut builder = QueryBuilder::<sqlx::Postgres>::new(
213        "SELECT entry_key, payload, updated_at FROM entries WHERE collection_name = ",
214    );
215    builder.push_bind(collection_name);
216
217    apply_query_filters(&mut builder, request, "payload");
218    builder.push(entry_order_clause());
219    builder.push(" LIMIT ");
220    builder.push_bind(request.limit);
221    builder.push(" OFFSET ");
222    builder.push_bind(request.offset);
223
224    fetch_entry_records(builder, pool).await
225}
226
227async fn query_typed_entries(
228    pool: &sqlx::PgPool,
229    schema_name: &str,
230    collection: &Collection,
231    request: &QueryEntriesRequest,
232) -> Result<Vec<EntryRecord>> {
233    let table_name = collection.table_name.as_ref().ok_or_else(|| {
234        CedrosDataError::InvalidRequest("typed collection missing table_name".into())
235    })?;
236    let payload_projection = typed_payload_projection("typed_entries");
237    let mut builder = QueryBuilder::<sqlx::Postgres>::new(format!(
238        "SELECT entry_key, payload, to_jsonb(typed_entries) AS typed_row, updated_at FROM {}.{} AS typed_entries WHERE TRUE",
239        quote_ident(schema_name)?,
240        quote_ident(table_name)?
241    ));
242
243    apply_query_filters(&mut builder, request, &payload_projection);
244    builder.push(entry_order_clause());
245    builder.push(" LIMIT ");
246    builder.push_bind(request.limit);
247    builder.push(" OFFSET ");
248    builder.push_bind(request.offset);
249
250    fetch_typed_entry_records(builder, pool).await
251}
252
253fn apply_query_filters(
254    builder: &mut QueryBuilder<'_, sqlx::Postgres>,
255    request: &QueryEntriesRequest,
256    payload_expression: &str,
257) {
258    if !request.entry_keys.is_empty() {
259        builder.push(" AND entry_key = ANY(");
260        builder.push_bind(request.entry_keys.clone());
261        builder.push(")");
262    }
263    if let Some(contains) = &request.contains {
264        builder.push(" AND ");
265        builder.push(payload_expression);
266        builder.push(" @> ");
267        builder.push_bind(contains.clone());
268    }
269}
270
271pub(super) fn entry_order_clause() -> &'static str {
272    ENTRY_ORDER_CLAUSE
273}
274
275fn validate_query_pagination(request: &QueryEntriesRequest) -> Result<()> {
276    if request.limit < 0 {
277        return Err(CedrosDataError::InvalidRequest(
278            "query limit cannot be negative".to_string(),
279        ));
280    }
281    if request.limit > MAX_QUERY_LIMIT {
282        return Err(CedrosDataError::InvalidRequest(format!(
283            "query limit cannot exceed {MAX_QUERY_LIMIT}"
284        )));
285    }
286    if request.offset < 0 {
287        return Err(CedrosDataError::InvalidRequest(
288            "query offset cannot be negative".to_string(),
289        ));
290    }
291    Ok(())
292}
293
294async fn fetch_entry_records(
295    mut builder: QueryBuilder<'_, sqlx::Postgres>,
296    pool: &sqlx::PgPool,
297) -> Result<Vec<EntryRecord>> {
298    let rows = builder.build().fetch_all(pool).await?;
299    Ok(rows
300        .into_iter()
301        .map(|row| EntryRecord {
302            entry_key: row.get("entry_key"),
303            payload: row.get("payload"),
304            updated_at: row.get("updated_at"),
305        })
306        .collect())
307}
308
309async fn fetch_typed_entry_records(
310    mut builder: QueryBuilder<'_, sqlx::Postgres>,
311    pool: &sqlx::PgPool,
312) -> Result<Vec<EntryRecord>> {
313    let rows = builder.build().fetch_all(pool).await?;
314    Ok(rows
315        .into_iter()
316        .map(|row| EntryRecord {
317            entry_key: row.get("entry_key"),
318            payload: merged_typed_payload(row.get("payload"), row.get("typed_row")),
319            updated_at: row.get("updated_at"),
320        })
321        .collect())
322}
323
324pub(super) fn merged_typed_payload(
325    mirror_payload: serde_json::Value,
326    typed_row: serde_json::Value,
327) -> serde_json::Value {
328    let serde_json::Value::Object(mut merged_payload) = mirror_payload else {
329        return mirror_payload;
330    };
331    let serde_json::Value::Object(mut typed_row_object) = typed_row else {
332        return serde_json::Value::Object(merged_payload);
333    };
334
335    for key in ["entry_key", "payload", "created_at", "updated_at"] {
336        typed_row_object.remove(key);
337    }
338    merged_payload.extend(typed_row_object);
339    serde_json::Value::Object(merged_payload)
340}
341
342pub(super) async fn typed_table_columns_with<'e, E>(
343    executor: E,
344    schema_name: &str,
345    table_name: &str,
346) -> Result<BTreeMap<String, bool>>
347where
348    E: sqlx::Executor<'e, Database = Postgres>,
349{
350    let rows = sqlx::query(
351        "SELECT column_name, is_nullable
352         FROM information_schema.columns
353         WHERE table_schema = $1 AND table_name = $2",
354    )
355    .bind(schema_name)
356    .bind(table_name)
357    .fetch_all(executor)
358    .await?;
359
360    let mut columns = BTreeMap::new();
361    for row in rows {
362        let column_name = row.get::<String, _>("column_name");
363        let is_nullable = row.get::<String, _>("is_nullable") == "YES";
364        columns.insert(column_name, is_nullable);
365    }
366
367    ensure_typed_system_columns(&columns)?;
368
369    Ok(columns)
370}
371
372fn typed_payload_projection(alias: &str) -> String {
373    format!(
374        "{alias}.payload || (to_jsonb({alias}) - 'entry_key' - 'payload' - 'created_at' - 'updated_at')"
375    )
376}
377
378fn ensure_typed_system_columns(columns: &BTreeMap<String, bool>) -> Result<()> {
379    let required_columns = ["entry_key", "payload", "updated_at"];
380    let missing = required_columns
381        .into_iter()
382        .filter(|column| !columns.contains_key(*column))
383        .collect::<Vec<_>>();
384
385    if missing.is_empty() {
386        return Ok(());
387    }
388
389    Err(CedrosDataError::InvalidRequest(format!(
390        "typed tables must contain {} columns",
391        missing.join(", ")
392    )))
393}
394
395fn validate_typed_payload(
396    columns: &BTreeMap<String, bool>,
397    payload: &serde_json::Value,
398) -> Result<()> {
399    let object = payload
400        .as_object()
401        .ok_or_else(|| CedrosDataError::InvalidRequest("payload must be object".to_string()))?;
402
403    for key in object.keys() {
404        if columns.contains_key(key) || is_system_column(key) {
405            continue;
406        }
407        return Err(CedrosDataError::InvalidRequest(format!(
408            "typed payload has unknown field: {key}"
409        )));
410    }
411
412    for (column, nullable) in columns {
413        if *nullable || is_system_column(column) {
414            continue;
415        }
416        let Some(value) = object.get(column) else {
417            return Err(CedrosDataError::InvalidRequest(format!(
418                "typed payload missing required column: {column}"
419            )));
420        };
421        if value.is_null() {
422            return Err(CedrosDataError::InvalidRequest(format!(
423                "typed payload column cannot be null: {column}"
424            )));
425        }
426    }
427
428    Ok(())
429}
430
431fn is_system_column(column: &str) -> bool {
432    matches!(
433        column,
434        "entry_key" | "payload" | "created_at" | "updated_at"
435    )
436}
437
438#[derive(Debug, Clone, Copy, PartialEq, Eq)]
439enum EntryWritePlan {
440    JsonbOnly,
441    TypedThenJsonbMirror,
442}
443
444fn entry_write_plan(mode: &CollectionMode) -> EntryWritePlan {
445    match mode {
446        CollectionMode::Jsonb => EntryWritePlan::JsonbOnly,
447        CollectionMode::Typed => EntryWritePlan::TypedThenJsonbMirror,
448    }
449}
450
451fn typed_upsert_statement(
452    schema_name: &str,
453    table_name: &str,
454    columns: &[String],
455) -> Result<String> {
456    let mut insert_columns = vec![quote_ident("entry_key")?, quote_ident("payload")?];
457    let mut select_columns = vec!["$1".to_string(), "$2::jsonb".to_string()];
458    let mut conflict_updates = vec![format!(
459        "{} = EXCLUDED.{}",
460        quote_ident("payload")?,
461        quote_ident("payload")?
462    )];
463
464    for column in columns {
465        let quoted = quote_ident(column)?;
466        insert_columns.push(quoted.clone());
467        select_columns.push(format!("record.{quoted}"));
468        conflict_updates.push(format!("{quoted} = EXCLUDED.{quoted}"));
469    }
470
471    conflict_updates.push(format!("{} = NOW()", quote_ident("updated_at")?));
472
473    Ok(format!(
474        "INSERT INTO {}.{} AS typed_entries ({})
475         SELECT {}
476         FROM jsonb_populate_record(NULL::{}.{}, $2::jsonb) AS record
477         ON CONFLICT ({}) DO UPDATE SET {}
478         RETURNING {}, {}, to_jsonb(typed_entries) AS typed_row, {}",
479        quote_ident(schema_name)?,
480        quote_ident(table_name)?,
481        insert_columns.join(", "),
482        select_columns.join(", "),
483        quote_ident(schema_name)?,
484        quote_ident(table_name)?,
485        quote_ident("entry_key")?,
486        conflict_updates.join(", "),
487        quote_ident("entry_key")?,
488        quote_ident("payload")?,
489        quote_ident("updated_at")?,
490    ))
491}