1use std::collections::BTreeMap;
2
3use sqlx::{Postgres, QueryBuilder, Row, Transaction};
4
5use crate::contracts::{infer_contract, merge_contracts, verify_contract_change};
6use crate::defaults::SITE_SCHEMA_NAME;
7use crate::error::{CedrosDataError, Result};
8use crate::models::{
9 Collection, CollectionMode, EntryRecord, QueryEntriesRequest, UpsertEntryRequest,
10};
11use crate::sql::quote_ident;
12
13use super::{
14 find_collection_with, insert_contract_in_transaction, latest_contract_with, CedrosData,
15};
16
17const MAX_QUERY_LIMIT: i64 = 1_000;
18const ENTRY_ORDER_CLAUSE: &str = " ORDER BY updated_at DESC, entry_key ASC";
19
20impl CedrosData {
21 pub async fn upsert_entry(&self, request: UpsertEntryRequest) -> Result<EntryRecord> {
22 self.ensure_site_exists().await?;
23 let mut tx = self.pool().begin().await?;
24 let record = self.upsert_entry_in_transaction(&mut tx, &request).await?;
25 tx.commit().await?;
26 Ok(record)
27 }
28
29 pub async fn query_entries(&self, request: QueryEntriesRequest) -> Result<Vec<EntryRecord>> {
30 validate_query_pagination(&request)?;
31 self.ensure_site_exists().await?;
32 let collection = self.find_collection(&request.collection_name).await?;
33
34 match collection.mode {
35 CollectionMode::Jsonb => {
36 query_jsonb_entries(self.pool(), &collection.collection_name, &request).await
37 }
38 CollectionMode::Typed => {
39 query_typed_entries(self.pool(), SITE_SCHEMA_NAME, &collection, &request).await
40 }
41 }
42 }
43
44 pub(super) async fn upsert_entry_in_transaction(
45 &self,
46 tx: &mut Transaction<'_, Postgres>,
47 request: &UpsertEntryRequest,
48 ) -> Result<EntryRecord> {
49 let collection = find_collection_with(&mut **tx, &request.collection_name).await?;
50 self.upsert_entry_for_collection_in_transaction(tx, &collection, request)
51 .await
52 }
53
54 pub(super) async fn upsert_entry_for_collection_in_transaction(
55 &self,
56 tx: &mut Transaction<'_, Postgres>,
57 collection: &Collection,
58 request: &UpsertEntryRequest,
59 ) -> Result<EntryRecord> {
60 if collection.collection_name != request.collection_name {
61 return Err(CedrosDataError::InvalidRequest(format!(
62 "entry request collection {} does not match loaded collection {}",
63 request.collection_name, collection.collection_name
64 )));
65 }
66 ensure_payload_object(&request.payload)?;
67
68 self.verify_upsert_contract_in_transaction(tx, collection, &request.payload)
69 .await?;
70
71 match entry_write_plan(&collection.mode) {
72 EntryWritePlan::JsonbOnly => {
73 upsert_jsonb_entry_with(&mut **tx, &collection.collection_name, request).await
74 }
75 EntryWritePlan::TypedThenJsonbMirror => {
76 let record = upsert_typed_entry_in_transaction(
77 self,
78 tx,
79 SITE_SCHEMA_NAME,
80 collection,
81 request,
82 )
83 .await?;
84 upsert_jsonb_entry_with(&mut **tx, &collection.collection_name, request).await?;
85 Ok(record)
86 }
87 }
88 }
89
90 async fn verify_upsert_contract_in_transaction(
91 &self,
92 tx: &mut Transaction<'_, Postgres>,
93 collection: &Collection,
94 payload: &serde_json::Value,
95 ) -> Result<()> {
96 let incoming = infer_contract(std::slice::from_ref(payload))?;
97 let latest = match collection.strict_contract {
98 Some(_) => None,
99 None => latest_contract_with(&mut **tx, &collection.collection_name).await?,
100 };
101
102 let reference = collection.strict_contract.as_ref().or(latest.as_ref());
103 let report = verify_contract_change(reference, &incoming);
104 if !report.passes {
105 return Err(CedrosDataError::ContractVerificationFailed(format!(
106 "{} => {}",
107 collection.collection_name,
108 report.breaking_changes.join(", ")
109 )));
110 }
111
112 if collection.strict_contract.is_none() {
113 let merged = merge_contracts(latest.as_ref(), &incoming);
114 if should_record_contract(latest.as_ref(), &merged)? {
115 insert_contract_in_transaction(tx, &collection.collection_name, &merged).await?;
116 }
117 }
118
119 Ok(())
120 }
121}
122
123fn ensure_payload_object(payload: &serde_json::Value) -> Result<()> {
124 if payload.is_object() {
125 return Ok(());
126 }
127 Err(CedrosDataError::InvalidRequest(
128 "entry payload must be a JSON object".to_string(),
129 ))
130}
131
132fn should_record_contract(
133 latest: Option<&crate::models::ContractSchema>,
134 merged: &crate::models::ContractSchema,
135) -> Result<bool> {
136 let Some(latest) = latest else {
137 return Ok(true);
138 };
139 Ok(serde_json::to_value(latest)? != serde_json::to_value(merged)?)
140}
141
142async fn upsert_jsonb_entry_with<'e, E>(
143 executor: E,
144 collection_name: &str,
145 request: &UpsertEntryRequest,
146) -> Result<EntryRecord>
147where
148 E: sqlx::Executor<'e, Database = Postgres>,
149{
150 let row = sqlx::query(
151 "INSERT INTO entries (collection_name, entry_key, payload, updated_at)
152 VALUES ($1, $2, $3, NOW())
153 ON CONFLICT (collection_name, entry_key)
154 DO UPDATE SET payload = EXCLUDED.payload,
155 updated_at = NOW()
156 RETURNING entry_key, payload, updated_at",
157 )
158 .bind(collection_name)
159 .bind(&request.entry_key)
160 .bind(&request.payload)
161 .fetch_one(executor)
162 .await?;
163
164 Ok(EntryRecord {
165 entry_key: row.get("entry_key"),
166 payload: row.get("payload"),
167 updated_at: row.get("updated_at"),
168 })
169}
170
171async fn upsert_typed_entry_in_transaction(
172 store: &CedrosData,
173 tx: &mut Transaction<'_, Postgres>,
174 schema_name: &str,
175 collection: &Collection,
176 request: &UpsertEntryRequest,
177) -> Result<EntryRecord> {
178 let table_name = collection.table_name.as_ref().ok_or_else(|| {
179 CedrosDataError::InvalidRequest("typed collection missing table_name".into())
180 })?;
181
182 let table_columns = store
183 .typed_table_columns_in_transaction(tx, schema_name, table_name)
184 .await?;
185 validate_typed_payload(&table_columns, &request.payload)?;
186
187 let typed_columns = table_columns
188 .keys()
189 .filter(|name| !is_system_column(name))
190 .cloned()
191 .collect::<Vec<String>>();
192
193 let statement = typed_upsert_statement(schema_name, table_name, &typed_columns)?;
194 let row = sqlx::query(&statement)
195 .bind(&request.entry_key)
196 .bind(&request.payload)
197 .fetch_one(&mut **tx)
198 .await?;
199
200 Ok(EntryRecord {
201 entry_key: row.get("entry_key"),
202 payload: merged_typed_payload(row.get("payload"), row.get("typed_row")),
203 updated_at: row.get("updated_at"),
204 })
205}
206
207async fn query_jsonb_entries(
208 pool: &sqlx::PgPool,
209 collection_name: &str,
210 request: &QueryEntriesRequest,
211) -> Result<Vec<EntryRecord>> {
212 let mut builder = QueryBuilder::<sqlx::Postgres>::new(
213 "SELECT entry_key, payload, updated_at FROM entries WHERE collection_name = ",
214 );
215 builder.push_bind(collection_name);
216
217 apply_query_filters(&mut builder, request, "payload");
218 builder.push(entry_order_clause());
219 builder.push(" LIMIT ");
220 builder.push_bind(request.limit);
221 builder.push(" OFFSET ");
222 builder.push_bind(request.offset);
223
224 fetch_entry_records(builder, pool).await
225}
226
227async fn query_typed_entries(
228 pool: &sqlx::PgPool,
229 schema_name: &str,
230 collection: &Collection,
231 request: &QueryEntriesRequest,
232) -> Result<Vec<EntryRecord>> {
233 let table_name = collection.table_name.as_ref().ok_or_else(|| {
234 CedrosDataError::InvalidRequest("typed collection missing table_name".into())
235 })?;
236 let payload_projection = typed_payload_projection("typed_entries");
237 let mut builder = QueryBuilder::<sqlx::Postgres>::new(format!(
238 "SELECT entry_key, payload, to_jsonb(typed_entries) AS typed_row, updated_at FROM {}.{} AS typed_entries WHERE TRUE",
239 quote_ident(schema_name)?,
240 quote_ident(table_name)?
241 ));
242
243 apply_query_filters(&mut builder, request, &payload_projection);
244 builder.push(entry_order_clause());
245 builder.push(" LIMIT ");
246 builder.push_bind(request.limit);
247 builder.push(" OFFSET ");
248 builder.push_bind(request.offset);
249
250 fetch_typed_entry_records(builder, pool).await
251}
252
253fn apply_query_filters(
254 builder: &mut QueryBuilder<'_, sqlx::Postgres>,
255 request: &QueryEntriesRequest,
256 payload_expression: &str,
257) {
258 if !request.entry_keys.is_empty() {
259 builder.push(" AND entry_key = ANY(");
260 builder.push_bind(request.entry_keys.clone());
261 builder.push(")");
262 }
263 if let Some(contains) = &request.contains {
264 builder.push(" AND ");
265 builder.push(payload_expression);
266 builder.push(" @> ");
267 builder.push_bind(contains.clone());
268 }
269}
270
271pub(super) fn entry_order_clause() -> &'static str {
272 ENTRY_ORDER_CLAUSE
273}
274
275fn validate_query_pagination(request: &QueryEntriesRequest) -> Result<()> {
276 if request.limit < 0 {
277 return Err(CedrosDataError::InvalidRequest(
278 "query limit cannot be negative".to_string(),
279 ));
280 }
281 if request.limit > MAX_QUERY_LIMIT {
282 return Err(CedrosDataError::InvalidRequest(format!(
283 "query limit cannot exceed {MAX_QUERY_LIMIT}"
284 )));
285 }
286 if request.offset < 0 {
287 return Err(CedrosDataError::InvalidRequest(
288 "query offset cannot be negative".to_string(),
289 ));
290 }
291 Ok(())
292}
293
294async fn fetch_entry_records(
295 mut builder: QueryBuilder<'_, sqlx::Postgres>,
296 pool: &sqlx::PgPool,
297) -> Result<Vec<EntryRecord>> {
298 let rows = builder.build().fetch_all(pool).await?;
299 Ok(rows
300 .into_iter()
301 .map(|row| EntryRecord {
302 entry_key: row.get("entry_key"),
303 payload: row.get("payload"),
304 updated_at: row.get("updated_at"),
305 })
306 .collect())
307}
308
309async fn fetch_typed_entry_records(
310 mut builder: QueryBuilder<'_, sqlx::Postgres>,
311 pool: &sqlx::PgPool,
312) -> Result<Vec<EntryRecord>> {
313 let rows = builder.build().fetch_all(pool).await?;
314 Ok(rows
315 .into_iter()
316 .map(|row| EntryRecord {
317 entry_key: row.get("entry_key"),
318 payload: merged_typed_payload(row.get("payload"), row.get("typed_row")),
319 updated_at: row.get("updated_at"),
320 })
321 .collect())
322}
323
324pub(super) fn merged_typed_payload(
325 mirror_payload: serde_json::Value,
326 typed_row: serde_json::Value,
327) -> serde_json::Value {
328 let serde_json::Value::Object(mut merged_payload) = mirror_payload else {
329 return mirror_payload;
330 };
331 let serde_json::Value::Object(mut typed_row_object) = typed_row else {
332 return serde_json::Value::Object(merged_payload);
333 };
334
335 for key in ["entry_key", "payload", "created_at", "updated_at"] {
336 typed_row_object.remove(key);
337 }
338 merged_payload.extend(typed_row_object);
339 serde_json::Value::Object(merged_payload)
340}
341
342pub(super) async fn typed_table_columns_with<'e, E>(
343 executor: E,
344 schema_name: &str,
345 table_name: &str,
346) -> Result<BTreeMap<String, bool>>
347where
348 E: sqlx::Executor<'e, Database = Postgres>,
349{
350 let rows = sqlx::query(
351 "SELECT column_name, is_nullable
352 FROM information_schema.columns
353 WHERE table_schema = $1 AND table_name = $2",
354 )
355 .bind(schema_name)
356 .bind(table_name)
357 .fetch_all(executor)
358 .await?;
359
360 let mut columns = BTreeMap::new();
361 for row in rows {
362 let column_name = row.get::<String, _>("column_name");
363 let is_nullable = row.get::<String, _>("is_nullable") == "YES";
364 columns.insert(column_name, is_nullable);
365 }
366
367 ensure_typed_system_columns(&columns)?;
368
369 Ok(columns)
370}
371
372fn typed_payload_projection(alias: &str) -> String {
373 format!(
374 "{alias}.payload || (to_jsonb({alias}) - 'entry_key' - 'payload' - 'created_at' - 'updated_at')"
375 )
376}
377
378fn ensure_typed_system_columns(columns: &BTreeMap<String, bool>) -> Result<()> {
379 let required_columns = ["entry_key", "payload", "updated_at"];
380 let missing = required_columns
381 .into_iter()
382 .filter(|column| !columns.contains_key(*column))
383 .collect::<Vec<_>>();
384
385 if missing.is_empty() {
386 return Ok(());
387 }
388
389 Err(CedrosDataError::InvalidRequest(format!(
390 "typed tables must contain {} columns",
391 missing.join(", ")
392 )))
393}
394
395fn validate_typed_payload(
396 columns: &BTreeMap<String, bool>,
397 payload: &serde_json::Value,
398) -> Result<()> {
399 let object = payload
400 .as_object()
401 .ok_or_else(|| CedrosDataError::InvalidRequest("payload must be object".to_string()))?;
402
403 for key in object.keys() {
404 if columns.contains_key(key) || is_system_column(key) {
405 continue;
406 }
407 return Err(CedrosDataError::InvalidRequest(format!(
408 "typed payload has unknown field: {key}"
409 )));
410 }
411
412 for (column, nullable) in columns {
413 if *nullable || is_system_column(column) {
414 continue;
415 }
416 let Some(value) = object.get(column) else {
417 return Err(CedrosDataError::InvalidRequest(format!(
418 "typed payload missing required column: {column}"
419 )));
420 };
421 if value.is_null() {
422 return Err(CedrosDataError::InvalidRequest(format!(
423 "typed payload column cannot be null: {column}"
424 )));
425 }
426 }
427
428 Ok(())
429}
430
431fn is_system_column(column: &str) -> bool {
432 matches!(
433 column,
434 "entry_key" | "payload" | "created_at" | "updated_at"
435 )
436}
437
438#[derive(Debug, Clone, Copy, PartialEq, Eq)]
439enum EntryWritePlan {
440 JsonbOnly,
441 TypedThenJsonbMirror,
442}
443
444fn entry_write_plan(mode: &CollectionMode) -> EntryWritePlan {
445 match mode {
446 CollectionMode::Jsonb => EntryWritePlan::JsonbOnly,
447 CollectionMode::Typed => EntryWritePlan::TypedThenJsonbMirror,
448 }
449}
450
451fn typed_upsert_statement(
452 schema_name: &str,
453 table_name: &str,
454 columns: &[String],
455) -> Result<String> {
456 let mut insert_columns = vec![quote_ident("entry_key")?, quote_ident("payload")?];
457 let mut select_columns = vec!["$1".to_string(), "$2::jsonb".to_string()];
458 let mut conflict_updates = vec![format!(
459 "{} = EXCLUDED.{}",
460 quote_ident("payload")?,
461 quote_ident("payload")?
462 )];
463
464 for column in columns {
465 let quoted = quote_ident(column)?;
466 insert_columns.push(quoted.clone());
467 select_columns.push(format!("record.{quoted}"));
468 conflict_updates.push(format!("{quoted} = EXCLUDED.{quoted}"));
469 }
470
471 conflict_updates.push(format!("{} = NOW()", quote_ident("updated_at")?));
472
473 Ok(format!(
474 "INSERT INTO {}.{} AS typed_entries ({})
475 SELECT {}
476 FROM jsonb_populate_record(NULL::{}.{}, $2::jsonb) AS record
477 ON CONFLICT ({}) DO UPDATE SET {}
478 RETURNING {}, {}, to_jsonb(typed_entries) AS typed_row, {}",
479 quote_ident(schema_name)?,
480 quote_ident(table_name)?,
481 insert_columns.join(", "),
482 select_columns.join(", "),
483 quote_ident(schema_name)?,
484 quote_ident(table_name)?,
485 quote_ident("entry_key")?,
486 conflict_updates.join(", "),
487 quote_ident("entry_key")?,
488 quote_ident("payload")?,
489 quote_ident("updated_at")?,
490 ))
491}