Skip to main content

cedros_data/store/
entries.rs

1use std::collections::BTreeMap;
2
3use sqlx::{Postgres, QueryBuilder, Row, Transaction};
4
5use crate::contracts::{infer_contract, merge_contracts, verify_contract_change};
6use crate::defaults::SITE_SCHEMA_NAME;
7use crate::error::{CedrosDataError, Result};
8use crate::models::{
9    Collection, CollectionMode, EntryRecord, QueryEntriesRequest, UpsertEntryRequest,
10};
11use crate::sql::quote_ident;
12
13use super::{
14    find_collection_with, insert_contract_in_transaction, latest_contract_with, CedrosData,
15};
16
17const MAX_QUERY_LIMIT: i64 = 1_000;
18const ENTRY_ORDER_CLAUSE: &str = " ORDER BY updated_at DESC, entry_key ASC";
19
20impl CedrosData {
21    pub async fn upsert_entry(&self, request: UpsertEntryRequest) -> Result<EntryRecord> {
22        self.ensure_site_exists().await?;
23        let mut tx = self.pool().begin().await?;
24        let record = self.upsert_entry_in_transaction(&mut tx, &request).await?;
25        tx.commit().await?;
26        Ok(record)
27    }
28
29    pub async fn query_entries(&self, request: QueryEntriesRequest) -> Result<Vec<EntryRecord>> {
30        validate_query_pagination(&request)?;
31        self.ensure_site_exists().await?;
32        let collection = self.find_collection(&request.collection_name).await?;
33
34        match collection.mode {
35            CollectionMode::Jsonb => {
36                query_jsonb_entries(self.pool(), &collection.collection_name, &request).await
37            }
38            CollectionMode::Typed => {
39                query_typed_entries(self.pool(), SITE_SCHEMA_NAME, &collection, &request).await
40            }
41        }
42    }
43
44    pub(super) async fn upsert_entry_in_transaction(
45        &self,
46        tx: &mut Transaction<'_, Postgres>,
47        request: &UpsertEntryRequest,
48    ) -> Result<EntryRecord> {
49        let collection = find_collection_with(&mut **tx, &request.collection_name).await?;
50        self.upsert_entry_for_collection_in_transaction(tx, &collection, request)
51            .await
52    }
53
54    pub(super) async fn upsert_entry_for_collection_in_transaction(
55        &self,
56        tx: &mut Transaction<'_, Postgres>,
57        collection: &Collection,
58        request: &UpsertEntryRequest,
59    ) -> Result<EntryRecord> {
60        if collection.collection_name != request.collection_name {
61            return Err(CedrosDataError::InvalidRequest(format!(
62                "entry request collection {} does not match loaded collection {}",
63                request.collection_name, collection.collection_name
64            )));
65        }
66        ensure_payload_object(&request.payload)?;
67
68        self.verify_upsert_contract_in_transaction(tx, collection, &request.payload)
69            .await?;
70
71        match entry_write_plan(&collection.mode) {
72            EntryWritePlan::JsonbOnly => {
73                upsert_jsonb_entry_with(&mut **tx, &collection.collection_name, request).await
74            }
75            EntryWritePlan::TypedThenJsonbMirror => {
76                let record = upsert_typed_entry_in_transaction(
77                    self,
78                    tx,
79                    SITE_SCHEMA_NAME,
80                    collection,
81                    request,
82                )
83                .await?;
84                upsert_jsonb_entry_with(&mut **tx, &collection.collection_name, request).await?;
85                Ok(record)
86            }
87        }
88    }
89
90    async fn verify_upsert_contract_in_transaction(
91        &self,
92        tx: &mut Transaction<'_, Postgres>,
93        collection: &Collection,
94        payload: &serde_json::Value,
95    ) -> Result<()> {
96        // Reserved collections (pages, navigation, site_settings) hold
97        // heterogeneous entry shapes — contract evolution doesn't apply.
98        if super::site::is_reserved_collection(&collection.collection_name) {
99            return Ok(());
100        }
101
102        let incoming = infer_contract(std::slice::from_ref(payload))?;
103        let latest = match collection.strict_contract {
104            Some(_) => None,
105            None => latest_contract_with(&mut **tx, &collection.collection_name).await?,
106        };
107
108        let reference = collection.strict_contract.as_ref().or(latest.as_ref());
109        let report = verify_contract_change(reference, &incoming);
110        if !report.passes {
111            return Err(CedrosDataError::ContractVerificationFailed(format!(
112                "{} => {}",
113                collection.collection_name,
114                report.breaking_changes.join(", ")
115            )));
116        }
117
118        if collection.strict_contract.is_none() {
119            let merged = merge_contracts(latest.as_ref(), &incoming);
120            if should_record_contract(latest.as_ref(), &merged)? {
121                insert_contract_in_transaction(tx, &collection.collection_name, &merged).await?;
122            }
123        }
124
125        Ok(())
126    }
127}
128
129fn ensure_payload_object(payload: &serde_json::Value) -> Result<()> {
130    if payload.is_object() {
131        return Ok(());
132    }
133    Err(CedrosDataError::InvalidRequest(
134        "entry payload must be a JSON object".to_string(),
135    ))
136}
137
138fn should_record_contract(
139    latest: Option<&crate::models::ContractSchema>,
140    merged: &crate::models::ContractSchema,
141) -> Result<bool> {
142    let Some(latest) = latest else {
143        return Ok(true);
144    };
145    Ok(serde_json::to_value(latest)? != serde_json::to_value(merged)?)
146}
147
148async fn upsert_jsonb_entry_with<'e, E>(
149    executor: E,
150    collection_name: &str,
151    request: &UpsertEntryRequest,
152) -> Result<EntryRecord>
153where
154    E: sqlx::Executor<'e, Database = Postgres>,
155{
156    let row = sqlx::query(
157        "INSERT INTO entries (collection_name, entry_key, payload, updated_at)
158         VALUES ($1, $2, $3, NOW())
159         ON CONFLICT (collection_name, entry_key)
160         DO UPDATE SET payload = EXCLUDED.payload,
161                       updated_at = NOW()
162         RETURNING entry_key, payload, updated_at",
163    )
164    .bind(collection_name)
165    .bind(&request.entry_key)
166    .bind(&request.payload)
167    .fetch_one(executor)
168    .await?;
169
170    Ok(EntryRecord {
171        entry_key: row.get("entry_key"),
172        payload: row.get("payload"),
173        updated_at: row.get("updated_at"),
174    })
175}
176
177async fn upsert_typed_entry_in_transaction(
178    store: &CedrosData,
179    tx: &mut Transaction<'_, Postgres>,
180    schema_name: &str,
181    collection: &Collection,
182    request: &UpsertEntryRequest,
183) -> Result<EntryRecord> {
184    let table_name = collection.table_name.as_ref().ok_or_else(|| {
185        CedrosDataError::InvalidRequest("typed collection missing table_name".into())
186    })?;
187
188    let table_columns = store
189        .typed_table_columns_in_transaction(tx, schema_name, table_name)
190        .await?;
191    validate_typed_payload(&table_columns, &request.payload)?;
192
193    let typed_columns = table_columns
194        .keys()
195        .filter(|name| !is_system_column(name))
196        .cloned()
197        .collect::<Vec<String>>();
198
199    let statement = typed_upsert_statement(schema_name, table_name, &typed_columns)?;
200    let row = sqlx::query(&statement)
201        .bind(&request.entry_key)
202        .bind(&request.payload)
203        .fetch_one(&mut **tx)
204        .await?;
205
206    Ok(EntryRecord {
207        entry_key: row.get("entry_key"),
208        payload: merged_typed_payload(row.get("payload"), row.get("typed_row")),
209        updated_at: row.get("updated_at"),
210    })
211}
212
213async fn query_jsonb_entries(
214    pool: &sqlx::PgPool,
215    collection_name: &str,
216    request: &QueryEntriesRequest,
217) -> Result<Vec<EntryRecord>> {
218    let mut builder = QueryBuilder::<sqlx::Postgres>::new(
219        "SELECT entry_key, payload, updated_at FROM entries WHERE collection_name = ",
220    );
221    builder.push_bind(collection_name);
222
223    apply_query_filters(&mut builder, request, "payload");
224    builder.push(entry_order_clause());
225    builder.push(" LIMIT ");
226    builder.push_bind(request.limit);
227    builder.push(" OFFSET ");
228    builder.push_bind(request.offset);
229
230    fetch_entry_records(builder, pool).await
231}
232
233async fn query_typed_entries(
234    pool: &sqlx::PgPool,
235    schema_name: &str,
236    collection: &Collection,
237    request: &QueryEntriesRequest,
238) -> Result<Vec<EntryRecord>> {
239    let table_name = collection.table_name.as_ref().ok_or_else(|| {
240        CedrosDataError::InvalidRequest("typed collection missing table_name".into())
241    })?;
242    let payload_projection = typed_payload_projection("typed_entries");
243    let mut builder = QueryBuilder::<sqlx::Postgres>::new(format!(
244        "SELECT entry_key, payload, to_jsonb(typed_entries) AS typed_row, updated_at FROM {}.{} AS typed_entries WHERE TRUE",
245        quote_ident(schema_name)?,
246        quote_ident(table_name)?
247    ));
248
249    apply_query_filters(&mut builder, request, &payload_projection);
250    builder.push(entry_order_clause());
251    builder.push(" LIMIT ");
252    builder.push_bind(request.limit);
253    builder.push(" OFFSET ");
254    builder.push_bind(request.offset);
255
256    fetch_typed_entry_records(builder, pool).await
257}
258
259fn apply_query_filters(
260    builder: &mut QueryBuilder<'_, sqlx::Postgres>,
261    request: &QueryEntriesRequest,
262    payload_expression: &str,
263) {
264    if !request.entry_keys.is_empty() {
265        builder.push(" AND entry_key = ANY(");
266        builder.push_bind(request.entry_keys.clone());
267        builder.push(")");
268    }
269    if let Some(contains) = &request.contains {
270        builder.push(" AND ");
271        builder.push(payload_expression);
272        builder.push(" @> ");
273        builder.push_bind(contains.clone());
274    }
275}
276
277pub(super) fn entry_order_clause() -> &'static str {
278    ENTRY_ORDER_CLAUSE
279}
280
281fn validate_query_pagination(request: &QueryEntriesRequest) -> Result<()> {
282    if request.limit < 0 {
283        return Err(CedrosDataError::InvalidRequest(
284            "query limit cannot be negative".to_string(),
285        ));
286    }
287    if request.limit > MAX_QUERY_LIMIT {
288        return Err(CedrosDataError::InvalidRequest(format!(
289            "query limit cannot exceed {MAX_QUERY_LIMIT}"
290        )));
291    }
292    if request.offset < 0 {
293        return Err(CedrosDataError::InvalidRequest(
294            "query offset cannot be negative".to_string(),
295        ));
296    }
297    Ok(())
298}
299
300async fn fetch_entry_records(
301    mut builder: QueryBuilder<'_, sqlx::Postgres>,
302    pool: &sqlx::PgPool,
303) -> Result<Vec<EntryRecord>> {
304    let rows = builder.build().fetch_all(pool).await?;
305    Ok(rows
306        .into_iter()
307        .map(|row| EntryRecord {
308            entry_key: row.get("entry_key"),
309            payload: row.get("payload"),
310            updated_at: row.get("updated_at"),
311        })
312        .collect())
313}
314
315async fn fetch_typed_entry_records(
316    mut builder: QueryBuilder<'_, sqlx::Postgres>,
317    pool: &sqlx::PgPool,
318) -> Result<Vec<EntryRecord>> {
319    let rows = builder.build().fetch_all(pool).await?;
320    Ok(rows
321        .into_iter()
322        .map(|row| EntryRecord {
323            entry_key: row.get("entry_key"),
324            payload: merged_typed_payload(row.get("payload"), row.get("typed_row")),
325            updated_at: row.get("updated_at"),
326        })
327        .collect())
328}
329
330pub(super) fn merged_typed_payload(
331    mirror_payload: serde_json::Value,
332    typed_row: serde_json::Value,
333) -> serde_json::Value {
334    let serde_json::Value::Object(mut merged_payload) = mirror_payload else {
335        return mirror_payload;
336    };
337    let serde_json::Value::Object(mut typed_row_object) = typed_row else {
338        return serde_json::Value::Object(merged_payload);
339    };
340
341    for key in ["entry_key", "payload", "created_at", "updated_at"] {
342        typed_row_object.remove(key);
343    }
344    merged_payload.extend(typed_row_object);
345    serde_json::Value::Object(merged_payload)
346}
347
348pub(super) async fn typed_table_columns_with<'e, E>(
349    executor: E,
350    schema_name: &str,
351    table_name: &str,
352) -> Result<BTreeMap<String, bool>>
353where
354    E: sqlx::Executor<'e, Database = Postgres>,
355{
356    let rows = sqlx::query(
357        "SELECT column_name, is_nullable
358         FROM information_schema.columns
359         WHERE table_schema = $1 AND table_name = $2",
360    )
361    .bind(schema_name)
362    .bind(table_name)
363    .fetch_all(executor)
364    .await?;
365
366    let mut columns = BTreeMap::new();
367    for row in rows {
368        let column_name = row.get::<String, _>("column_name");
369        let is_nullable = row.get::<String, _>("is_nullable") == "YES";
370        columns.insert(column_name, is_nullable);
371    }
372
373    ensure_typed_system_columns(&columns)?;
374
375    Ok(columns)
376}
377
378fn typed_payload_projection(alias: &str) -> String {
379    format!(
380        "{alias}.payload || (to_jsonb({alias}) - 'entry_key' - 'payload' - 'created_at' - 'updated_at')"
381    )
382}
383
384fn ensure_typed_system_columns(columns: &BTreeMap<String, bool>) -> Result<()> {
385    let required_columns = ["entry_key", "payload", "updated_at"];
386    let missing = required_columns
387        .into_iter()
388        .filter(|column| !columns.contains_key(*column))
389        .collect::<Vec<_>>();
390
391    if missing.is_empty() {
392        return Ok(());
393    }
394
395    Err(CedrosDataError::InvalidRequest(format!(
396        "typed tables must contain {} columns",
397        missing.join(", ")
398    )))
399}
400
401fn validate_typed_payload(
402    columns: &BTreeMap<String, bool>,
403    payload: &serde_json::Value,
404) -> Result<()> {
405    let object = payload
406        .as_object()
407        .ok_or_else(|| CedrosDataError::InvalidRequest("payload must be object".to_string()))?;
408
409    for key in object.keys() {
410        if columns.contains_key(key) || is_system_column(key) {
411            continue;
412        }
413        return Err(CedrosDataError::InvalidRequest(format!(
414            "typed payload has unknown field: {key}"
415        )));
416    }
417
418    for (column, nullable) in columns {
419        if *nullable || is_system_column(column) {
420            continue;
421        }
422        let Some(value) = object.get(column) else {
423            return Err(CedrosDataError::InvalidRequest(format!(
424                "typed payload missing required column: {column}"
425            )));
426        };
427        if value.is_null() {
428            return Err(CedrosDataError::InvalidRequest(format!(
429                "typed payload column cannot be null: {column}"
430            )));
431        }
432    }
433
434    Ok(())
435}
436
437fn is_system_column(column: &str) -> bool {
438    matches!(
439        column,
440        "entry_key" | "payload" | "created_at" | "updated_at"
441    )
442}
443
444#[derive(Debug, Clone, Copy, PartialEq, Eq)]
445enum EntryWritePlan {
446    JsonbOnly,
447    TypedThenJsonbMirror,
448}
449
450fn entry_write_plan(mode: &CollectionMode) -> EntryWritePlan {
451    match mode {
452        CollectionMode::Jsonb => EntryWritePlan::JsonbOnly,
453        CollectionMode::Typed => EntryWritePlan::TypedThenJsonbMirror,
454    }
455}
456
457fn typed_upsert_statement(
458    schema_name: &str,
459    table_name: &str,
460    columns: &[String],
461) -> Result<String> {
462    let mut insert_columns = vec![quote_ident("entry_key")?, quote_ident("payload")?];
463    let mut select_columns = vec!["$1".to_string(), "$2::jsonb".to_string()];
464    let mut conflict_updates = vec![format!(
465        "{} = EXCLUDED.{}",
466        quote_ident("payload")?,
467        quote_ident("payload")?
468    )];
469
470    for column in columns {
471        let quoted = quote_ident(column)?;
472        insert_columns.push(quoted.clone());
473        select_columns.push(format!("record.{quoted}"));
474        conflict_updates.push(format!("{quoted} = EXCLUDED.{quoted}"));
475    }
476
477    conflict_updates.push(format!("{} = NOW()", quote_ident("updated_at")?));
478
479    Ok(format!(
480        "INSERT INTO {}.{} AS typed_entries ({})
481         SELECT {}
482         FROM jsonb_populate_record(NULL::{}.{}, $2::jsonb) AS record
483         ON CONFLICT ({}) DO UPDATE SET {}
484         RETURNING {}, {}, to_jsonb(typed_entries) AS typed_row, {}",
485        quote_ident(schema_name)?,
486        quote_ident(table_name)?,
487        insert_columns.join(", "),
488        select_columns.join(", "),
489        quote_ident(schema_name)?,
490        quote_ident(table_name)?,
491        quote_ident("entry_key")?,
492        conflict_updates.join(", "),
493        quote_ident("entry_key")?,
494        quote_ident("payload")?,
495        quote_ident("updated_at")?,
496    ))
497}