1use std::collections::BTreeMap;
2
3use sqlx::{Postgres, QueryBuilder, Row, Transaction};
4
5use crate::contracts::{infer_contract, merge_contracts, verify_contract_change};
6use crate::defaults::SITE_SCHEMA_NAME;
7use crate::error::{CedrosDataError, Result};
8use crate::models::{
9 Collection, CollectionMode, EntryRecord, QueryEntriesRequest, UpsertEntryRequest,
10};
11use crate::sql::quote_ident;
12
13use super::{
14 find_collection_with, insert_contract_in_transaction, latest_contract_with, CedrosData,
15};
16
17const MAX_QUERY_LIMIT: i64 = 1_000;
18const ENTRY_ORDER_CLAUSE: &str = " ORDER BY updated_at DESC, entry_key ASC";
19
20impl CedrosData {
21 pub async fn upsert_entry(&self, request: UpsertEntryRequest) -> Result<EntryRecord> {
22 self.ensure_site_exists().await?;
23 let mut tx = self.pool().begin().await?;
24 let record = self.upsert_entry_in_transaction(&mut tx, &request).await?;
25 tx.commit().await?;
26 Ok(record)
27 }
28
29 pub async fn query_entries(&self, request: QueryEntriesRequest) -> Result<Vec<EntryRecord>> {
30 validate_query_pagination(&request)?;
31 self.ensure_site_exists().await?;
32 let collection = self.find_collection(&request.collection_name).await?;
33
34 match collection.mode {
35 CollectionMode::Jsonb => {
36 query_jsonb_entries(self.pool(), &collection.collection_name, &request).await
37 }
38 CollectionMode::Typed => {
39 query_typed_entries(self.pool(), SITE_SCHEMA_NAME, &collection, &request).await
40 }
41 }
42 }
43
44 pub(super) async fn upsert_entry_in_transaction(
45 &self,
46 tx: &mut Transaction<'_, Postgres>,
47 request: &UpsertEntryRequest,
48 ) -> Result<EntryRecord> {
49 let collection = find_collection_with(&mut **tx, &request.collection_name).await?;
50 self.upsert_entry_for_collection_in_transaction(tx, &collection, request)
51 .await
52 }
53
54 pub(super) async fn upsert_entry_for_collection_in_transaction(
55 &self,
56 tx: &mut Transaction<'_, Postgres>,
57 collection: &Collection,
58 request: &UpsertEntryRequest,
59 ) -> Result<EntryRecord> {
60 if collection.collection_name != request.collection_name {
61 return Err(CedrosDataError::InvalidRequest(format!(
62 "entry request collection {} does not match loaded collection {}",
63 request.collection_name, collection.collection_name
64 )));
65 }
66 ensure_payload_object(&request.payload)?;
67
68 self.verify_upsert_contract_in_transaction(tx, collection, &request.payload)
69 .await?;
70
71 match entry_write_plan(&collection.mode) {
72 EntryWritePlan::JsonbOnly => {
73 upsert_jsonb_entry_with(&mut **tx, &collection.collection_name, request).await
74 }
75 EntryWritePlan::TypedThenJsonbMirror => {
76 let record = upsert_typed_entry_in_transaction(
77 self,
78 tx,
79 SITE_SCHEMA_NAME,
80 collection,
81 request,
82 )
83 .await?;
84 upsert_jsonb_entry_with(&mut **tx, &collection.collection_name, request).await?;
85 Ok(record)
86 }
87 }
88 }
89
90 async fn verify_upsert_contract_in_transaction(
91 &self,
92 tx: &mut Transaction<'_, Postgres>,
93 collection: &Collection,
94 payload: &serde_json::Value,
95 ) -> Result<()> {
96 if super::site::is_reserved_collection(&collection.collection_name) {
99 return Ok(());
100 }
101
102 let incoming = infer_contract(std::slice::from_ref(payload))?;
103 let latest = match collection.strict_contract {
104 Some(_) => None,
105 None => latest_contract_with(&mut **tx, &collection.collection_name).await?,
106 };
107
108 let reference = collection.strict_contract.as_ref().or(latest.as_ref());
109 let report = verify_contract_change(reference, &incoming);
110 if !report.passes {
111 return Err(CedrosDataError::ContractVerificationFailed(format!(
112 "{} => {}",
113 collection.collection_name,
114 report.breaking_changes.join(", ")
115 )));
116 }
117
118 if collection.strict_contract.is_none() {
119 let merged = merge_contracts(latest.as_ref(), &incoming);
120 if should_record_contract(latest.as_ref(), &merged)? {
121 insert_contract_in_transaction(tx, &collection.collection_name, &merged).await?;
122 }
123 }
124
125 Ok(())
126 }
127}
128
129fn ensure_payload_object(payload: &serde_json::Value) -> Result<()> {
130 if payload.is_object() {
131 return Ok(());
132 }
133 Err(CedrosDataError::InvalidRequest(
134 "entry payload must be a JSON object".to_string(),
135 ))
136}
137
138fn should_record_contract(
139 latest: Option<&crate::models::ContractSchema>,
140 merged: &crate::models::ContractSchema,
141) -> Result<bool> {
142 let Some(latest) = latest else {
143 return Ok(true);
144 };
145 Ok(serde_json::to_value(latest)? != serde_json::to_value(merged)?)
146}
147
148async fn upsert_jsonb_entry_with<'e, E>(
149 executor: E,
150 collection_name: &str,
151 request: &UpsertEntryRequest,
152) -> Result<EntryRecord>
153where
154 E: sqlx::Executor<'e, Database = Postgres>,
155{
156 let row = sqlx::query(
157 "INSERT INTO entries (collection_name, entry_key, payload, updated_at)
158 VALUES ($1, $2, $3, NOW())
159 ON CONFLICT (collection_name, entry_key)
160 DO UPDATE SET payload = EXCLUDED.payload,
161 updated_at = NOW()
162 RETURNING entry_key, payload, updated_at",
163 )
164 .bind(collection_name)
165 .bind(&request.entry_key)
166 .bind(&request.payload)
167 .fetch_one(executor)
168 .await?;
169
170 Ok(EntryRecord {
171 entry_key: row.get("entry_key"),
172 payload: row.get("payload"),
173 updated_at: row.get("updated_at"),
174 })
175}
176
177async fn upsert_typed_entry_in_transaction(
178 store: &CedrosData,
179 tx: &mut Transaction<'_, Postgres>,
180 schema_name: &str,
181 collection: &Collection,
182 request: &UpsertEntryRequest,
183) -> Result<EntryRecord> {
184 let table_name = collection.table_name.as_ref().ok_or_else(|| {
185 CedrosDataError::InvalidRequest("typed collection missing table_name".into())
186 })?;
187
188 let table_columns = store
189 .typed_table_columns_in_transaction(tx, schema_name, table_name)
190 .await?;
191 validate_typed_payload(&table_columns, &request.payload)?;
192
193 let typed_columns = table_columns
194 .keys()
195 .filter(|name| !is_system_column(name))
196 .cloned()
197 .collect::<Vec<String>>();
198
199 let statement = typed_upsert_statement(schema_name, table_name, &typed_columns)?;
200 let row = sqlx::query(&statement)
201 .bind(&request.entry_key)
202 .bind(&request.payload)
203 .fetch_one(&mut **tx)
204 .await?;
205
206 Ok(EntryRecord {
207 entry_key: row.get("entry_key"),
208 payload: merged_typed_payload(row.get("payload"), row.get("typed_row")),
209 updated_at: row.get("updated_at"),
210 })
211}
212
213async fn query_jsonb_entries(
214 pool: &sqlx::PgPool,
215 collection_name: &str,
216 request: &QueryEntriesRequest,
217) -> Result<Vec<EntryRecord>> {
218 let mut builder = QueryBuilder::<sqlx::Postgres>::new(
219 "SELECT entry_key, payload, updated_at FROM entries WHERE collection_name = ",
220 );
221 builder.push_bind(collection_name);
222
223 apply_query_filters(&mut builder, request, "payload");
224 builder.push(entry_order_clause());
225 builder.push(" LIMIT ");
226 builder.push_bind(request.limit);
227 builder.push(" OFFSET ");
228 builder.push_bind(request.offset);
229
230 fetch_entry_records(builder, pool).await
231}
232
233async fn query_typed_entries(
234 pool: &sqlx::PgPool,
235 schema_name: &str,
236 collection: &Collection,
237 request: &QueryEntriesRequest,
238) -> Result<Vec<EntryRecord>> {
239 let table_name = collection.table_name.as_ref().ok_or_else(|| {
240 CedrosDataError::InvalidRequest("typed collection missing table_name".into())
241 })?;
242 let payload_projection = typed_payload_projection("typed_entries");
243 let mut builder = QueryBuilder::<sqlx::Postgres>::new(format!(
244 "SELECT entry_key, payload, to_jsonb(typed_entries) AS typed_row, updated_at FROM {}.{} AS typed_entries WHERE TRUE",
245 quote_ident(schema_name)?,
246 quote_ident(table_name)?
247 ));
248
249 apply_query_filters(&mut builder, request, &payload_projection);
250 builder.push(entry_order_clause());
251 builder.push(" LIMIT ");
252 builder.push_bind(request.limit);
253 builder.push(" OFFSET ");
254 builder.push_bind(request.offset);
255
256 fetch_typed_entry_records(builder, pool).await
257}
258
259fn apply_query_filters(
260 builder: &mut QueryBuilder<'_, sqlx::Postgres>,
261 request: &QueryEntriesRequest,
262 payload_expression: &str,
263) {
264 if !request.entry_keys.is_empty() {
265 builder.push(" AND entry_key = ANY(");
266 builder.push_bind(request.entry_keys.clone());
267 builder.push(")");
268 }
269 if let Some(contains) = &request.contains {
270 builder.push(" AND ");
271 builder.push(payload_expression);
272 builder.push(" @> ");
273 builder.push_bind(contains.clone());
274 }
275}
276
277pub(super) fn entry_order_clause() -> &'static str {
278 ENTRY_ORDER_CLAUSE
279}
280
281fn validate_query_pagination(request: &QueryEntriesRequest) -> Result<()> {
282 if request.limit < 0 {
283 return Err(CedrosDataError::InvalidRequest(
284 "query limit cannot be negative".to_string(),
285 ));
286 }
287 if request.limit > MAX_QUERY_LIMIT {
288 return Err(CedrosDataError::InvalidRequest(format!(
289 "query limit cannot exceed {MAX_QUERY_LIMIT}"
290 )));
291 }
292 if request.offset < 0 {
293 return Err(CedrosDataError::InvalidRequest(
294 "query offset cannot be negative".to_string(),
295 ));
296 }
297 Ok(())
298}
299
300async fn fetch_entry_records(
301 mut builder: QueryBuilder<'_, sqlx::Postgres>,
302 pool: &sqlx::PgPool,
303) -> Result<Vec<EntryRecord>> {
304 let rows = builder.build().fetch_all(pool).await?;
305 Ok(rows
306 .into_iter()
307 .map(|row| EntryRecord {
308 entry_key: row.get("entry_key"),
309 payload: row.get("payload"),
310 updated_at: row.get("updated_at"),
311 })
312 .collect())
313}
314
315async fn fetch_typed_entry_records(
316 mut builder: QueryBuilder<'_, sqlx::Postgres>,
317 pool: &sqlx::PgPool,
318) -> Result<Vec<EntryRecord>> {
319 let rows = builder.build().fetch_all(pool).await?;
320 Ok(rows
321 .into_iter()
322 .map(|row| EntryRecord {
323 entry_key: row.get("entry_key"),
324 payload: merged_typed_payload(row.get("payload"), row.get("typed_row")),
325 updated_at: row.get("updated_at"),
326 })
327 .collect())
328}
329
330pub(super) fn merged_typed_payload(
331 mirror_payload: serde_json::Value,
332 typed_row: serde_json::Value,
333) -> serde_json::Value {
334 let serde_json::Value::Object(mut merged_payload) = mirror_payload else {
335 return mirror_payload;
336 };
337 let serde_json::Value::Object(mut typed_row_object) = typed_row else {
338 return serde_json::Value::Object(merged_payload);
339 };
340
341 for key in ["entry_key", "payload", "created_at", "updated_at"] {
342 typed_row_object.remove(key);
343 }
344 merged_payload.extend(typed_row_object);
345 serde_json::Value::Object(merged_payload)
346}
347
348pub(super) async fn typed_table_columns_with<'e, E>(
349 executor: E,
350 schema_name: &str,
351 table_name: &str,
352) -> Result<BTreeMap<String, bool>>
353where
354 E: sqlx::Executor<'e, Database = Postgres>,
355{
356 let rows = sqlx::query(
357 "SELECT column_name, is_nullable
358 FROM information_schema.columns
359 WHERE table_schema = $1 AND table_name = $2",
360 )
361 .bind(schema_name)
362 .bind(table_name)
363 .fetch_all(executor)
364 .await?;
365
366 let mut columns = BTreeMap::new();
367 for row in rows {
368 let column_name = row.get::<String, _>("column_name");
369 let is_nullable = row.get::<String, _>("is_nullable") == "YES";
370 columns.insert(column_name, is_nullable);
371 }
372
373 ensure_typed_system_columns(&columns)?;
374
375 Ok(columns)
376}
377
378fn typed_payload_projection(alias: &str) -> String {
379 format!(
380 "{alias}.payload || (to_jsonb({alias}) - 'entry_key' - 'payload' - 'created_at' - 'updated_at')"
381 )
382}
383
384fn ensure_typed_system_columns(columns: &BTreeMap<String, bool>) -> Result<()> {
385 let required_columns = ["entry_key", "payload", "updated_at"];
386 let missing = required_columns
387 .into_iter()
388 .filter(|column| !columns.contains_key(*column))
389 .collect::<Vec<_>>();
390
391 if missing.is_empty() {
392 return Ok(());
393 }
394
395 Err(CedrosDataError::InvalidRequest(format!(
396 "typed tables must contain {} columns",
397 missing.join(", ")
398 )))
399}
400
401fn validate_typed_payload(
402 columns: &BTreeMap<String, bool>,
403 payload: &serde_json::Value,
404) -> Result<()> {
405 let object = payload
406 .as_object()
407 .ok_or_else(|| CedrosDataError::InvalidRequest("payload must be object".to_string()))?;
408
409 for key in object.keys() {
410 if columns.contains_key(key) || is_system_column(key) {
411 continue;
412 }
413 return Err(CedrosDataError::InvalidRequest(format!(
414 "typed payload has unknown field: {key}"
415 )));
416 }
417
418 for (column, nullable) in columns {
419 if *nullable || is_system_column(column) {
420 continue;
421 }
422 let Some(value) = object.get(column) else {
423 return Err(CedrosDataError::InvalidRequest(format!(
424 "typed payload missing required column: {column}"
425 )));
426 };
427 if value.is_null() {
428 return Err(CedrosDataError::InvalidRequest(format!(
429 "typed payload column cannot be null: {column}"
430 )));
431 }
432 }
433
434 Ok(())
435}
436
437fn is_system_column(column: &str) -> bool {
438 matches!(
439 column,
440 "entry_key" | "payload" | "created_at" | "updated_at"
441 )
442}
443
444#[derive(Debug, Clone, Copy, PartialEq, Eq)]
445enum EntryWritePlan {
446 JsonbOnly,
447 TypedThenJsonbMirror,
448}
449
450fn entry_write_plan(mode: &CollectionMode) -> EntryWritePlan {
451 match mode {
452 CollectionMode::Jsonb => EntryWritePlan::JsonbOnly,
453 CollectionMode::Typed => EntryWritePlan::TypedThenJsonbMirror,
454 }
455}
456
457fn typed_upsert_statement(
458 schema_name: &str,
459 table_name: &str,
460 columns: &[String],
461) -> Result<String> {
462 let mut insert_columns = vec![quote_ident("entry_key")?, quote_ident("payload")?];
463 let mut select_columns = vec!["$1".to_string(), "$2::jsonb".to_string()];
464 let mut conflict_updates = vec![format!(
465 "{} = EXCLUDED.{}",
466 quote_ident("payload")?,
467 quote_ident("payload")?
468 )];
469
470 for column in columns {
471 let quoted = quote_ident(column)?;
472 insert_columns.push(quoted.clone());
473 select_columns.push(format!("record.{quoted}"));
474 conflict_updates.push(format!("{quoted} = EXCLUDED.{quoted}"));
475 }
476
477 conflict_updates.push(format!("{} = NOW()", quote_ident("updated_at")?));
478
479 Ok(format!(
480 "INSERT INTO {}.{} AS typed_entries ({})
481 SELECT {}
482 FROM jsonb_populate_record(NULL::{}.{}, $2::jsonb) AS record
483 ON CONFLICT ({}) DO UPDATE SET {}
484 RETURNING {}, {}, to_jsonb(typed_entries) AS typed_row, {}",
485 quote_ident(schema_name)?,
486 quote_ident(table_name)?,
487 insert_columns.join(", "),
488 select_columns.join(", "),
489 quote_ident(schema_name)?,
490 quote_ident(table_name)?,
491 quote_ident("entry_key")?,
492 conflict_updates.join(", "),
493 quote_ident("entry_key")?,
494 quote_ident("payload")?,
495 quote_ident("updated_at")?,
496 ))
497}