1use sqlx::{Postgres, Transaction};
2
3use crate::defaults::SITE_SCHEMA_NAME;
4use crate::error::{CedrosDataError, Result};
5use crate::models::{
6 Collection, CollectionMode, RegisterCollectionRequest, RegisterSiteRequest, Site,
7};
8use crate::sql::{generated_name, quote_ident, validate_identifier};
9
10use super::{
11 find_collection_with, insert_contract_in_transaction, load_site_if_configured_with,
12 map_collection_row, map_site_row, CedrosData,
13};
14
15impl CedrosData {
16 pub async fn register_site(&self, request: RegisterSiteRequest) -> Result<Site> {
17 if request.display_name.trim().is_empty() {
18 return Err(CedrosDataError::InvalidRequest(
19 "display_name cannot be empty".to_string(),
20 ));
21 }
22
23 let mut tx = self.pool().begin().await?;
24 let site = upsert_site_record(&mut tx, &request).await?;
25 create_site_schema(&mut tx).await?;
26 self.bootstrap_defaults_in_transaction(&mut tx).await?;
27 tx.commit().await?;
28 Ok(site)
29 }
30
31 pub async fn register_collection(
32 &self,
33 request: RegisterCollectionRequest,
34 ) -> Result<Collection> {
35 self.ensure_site_exists().await?;
36 let mut tx = self.pool().begin().await?;
37 let collection = self
38 .register_collection_in_transaction(&mut tx, request, true)
39 .await?;
40 tx.commit().await?;
41 Ok(collection)
42 }
43
44 pub(super) async fn register_collection_in_transaction(
45 &self,
46 tx: &mut Transaction<'_, Postgres>,
47 request: RegisterCollectionRequest,
48 persist_contract_history: bool,
49 ) -> Result<Collection> {
50 if request.collection_name.trim().is_empty() {
51 return Err(CedrosDataError::InvalidRequest(
52 "collection_name cannot be empty".to_string(),
53 ));
54 }
55
56 let existing = optional_existing_collection(
57 find_collection_with(&mut **tx, &request.collection_name).await,
58 )?;
59 validate_reserved_collection_request(&request)?;
60
61 let table_name = resolved_table_name(existing.as_ref(), &request);
62 validate_existing_collection_storage(existing.as_ref(), &request, table_name.as_deref())?;
63
64 if let Some(name) = &table_name {
65 validate_identifier(name)?;
66 }
67
68 let row = sqlx::query(
69 "INSERT INTO collections (collection_name, mode, table_name, strict_contract)
70 VALUES ($1, $2, $3, $4)
71 ON CONFLICT (collection_name)
72 DO UPDATE SET
73 mode = EXCLUDED.mode,
74 table_name = EXCLUDED.table_name,
75 strict_contract = EXCLUDED.strict_contract,
76 updated_at = NOW()
77 RETURNING collection_name, mode, table_name, strict_contract",
78 )
79 .bind(&request.collection_name)
80 .bind(request.mode.as_str())
81 .bind(&table_name)
82 .bind(
83 request
84 .strict_contract
85 .as_ref()
86 .map(serde_json::to_value)
87 .transpose()?,
88 )
89 .fetch_one(&mut **tx)
90 .await?;
91
92 let collection = map_collection_row(row)?;
93 if let CollectionMode::Typed = collection.mode {
94 create_typed_table(tx, collection.table_name.as_deref()).await?;
95 let table_name = collection.table_name.as_deref().ok_or_else(|| {
96 CedrosDataError::InvalidRequest("typed collections require table_name".to_string())
97 })?;
98 self.invalidate_typed_table_columns_cache(SITE_SCHEMA_NAME, table_name)
99 .await;
100 self.validate_typed_table_compatibility_in_transaction(
101 tx,
102 SITE_SCHEMA_NAME,
103 table_name,
104 )
105 .await?;
106 }
107
108 if persist_contract_history {
109 if let Some(contract) = collection.strict_contract.as_ref() {
110 insert_contract_in_transaction(tx, &collection.collection_name, contract).await?;
111 }
112 }
113
114 Ok(collection)
115 }
116
117 pub async fn list_collections(&self) -> Result<Vec<Collection>> {
118 self.ensure_site_exists().await?;
119 let rows = sqlx::query(
120 "SELECT collection_name, mode, table_name, strict_contract
121 FROM collections
122 ORDER BY collection_name",
123 )
124 .fetch_all(self.pool())
125 .await?;
126
127 rows.into_iter().map(map_collection_row).collect()
128 }
129}
130
131fn optional_existing_collection(result: Result<Collection>) -> Result<Option<Collection>> {
132 match result {
133 Ok(collection) => Ok(Some(collection)),
134 Err(CedrosDataError::CollectionNotFound(_)) => Ok(None),
135 Err(error) => Err(error),
136 }
137}
138
139pub(super) async fn upsert_site_record(
140 tx: &mut Transaction<'_, Postgres>,
141 request: &RegisterSiteRequest,
142) -> Result<Site> {
143 if load_site_if_configured_with(&mut **tx).await?.is_none() {
144 create_site_schema(tx).await?;
145 }
146
147 let row = sqlx::query(
148 "INSERT INTO site (id, display_name, metadata)
149 VALUES (1, $1, $2)
150 ON CONFLICT (id)
151 DO UPDATE SET
152 display_name = EXCLUDED.display_name,
153 metadata = EXCLUDED.metadata,
154 updated_at = NOW()
155 RETURNING display_name, metadata",
156 )
157 .bind(&request.display_name)
158 .bind(&request.metadata)
159 .fetch_one(&mut **tx)
160 .await?;
161
162 map_site_row(row)
163}
164
165pub(super) async fn create_site_schema(tx: &mut Transaction<'_, Postgres>) -> Result<()> {
166 let statement = format!(
167 "CREATE SCHEMA IF NOT EXISTS {}",
168 quote_ident(SITE_SCHEMA_NAME)?
169 );
170 sqlx::query(&statement).execute(&mut **tx).await?;
171 Ok(())
172}
173
174pub(super) async fn create_typed_table(
175 tx: &mut Transaction<'_, Postgres>,
176 table_name: Option<&str>,
177) -> Result<()> {
178 let table_name = table_name.ok_or_else(|| {
179 CedrosDataError::InvalidRequest("typed collections require table_name".to_string())
180 })?;
181
182 let statement = format!(
183 "CREATE TABLE IF NOT EXISTS {}.{} (
184 entry_key TEXT PRIMARY KEY,
185 payload JSONB NOT NULL DEFAULT '{{}}'::jsonb,
186 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
187 updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
188 )",
189 quote_ident(SITE_SCHEMA_NAME)?,
190 quote_ident(table_name)?
191 );
192
193 sqlx::query(&statement).execute(&mut **tx).await?;
194 for index_statement in typed_table_index_statements(table_name)? {
195 sqlx::query(&index_statement).execute(&mut **tx).await?;
196 }
197 Ok(())
198}
199
200fn typed_table_index_statements(table_name: &str) -> Result<Vec<String>> {
201 let quoted_schema = quote_ident(SITE_SCHEMA_NAME)?;
202 let quoted_table = quote_ident(table_name)?;
203 let payload_index_name = generated_name("idx", table_name, &["payload".to_string()]);
204 let pagination_index_name = generated_name(
205 "idx",
206 table_name,
207 &["updated_at".to_string(), "entry_key".to_string()],
208 );
209
210 Ok(vec![
211 format!(
212 "CREATE INDEX IF NOT EXISTS {} ON {}.{} USING GIN (payload)",
213 quote_ident(&payload_index_name)?,
214 quoted_schema,
215 quoted_table
216 ),
217 format!(
218 "CREATE INDEX IF NOT EXISTS {} ON {}.{} (updated_at DESC, entry_key ASC)",
219 quote_ident(&pagination_index_name)?,
220 quoted_schema,
221 quoted_table
222 ),
223 ])
224}
225
226fn validate_reserved_collection_request(request: &RegisterCollectionRequest) -> Result<()> {
227 if !is_reserved_collection(&request.collection_name) {
228 return Ok(());
229 }
230 if request.mode != CollectionMode::Jsonb
231 || request.table_name.is_some()
232 || request.strict_contract.is_some()
233 {
234 return Err(CedrosDataError::InvalidRequest(format!(
235 "reserved collection {} must remain a jsonb collection without table_name or strict_contract",
236 request.collection_name
237 )));
238 }
239 Ok(())
240}
241
242pub(super) fn is_reserved_collection(collection_name: &str) -> bool {
243 matches!(collection_name, "pages" | "navigation" | "site_settings")
244}
245
246fn resolved_table_name(
247 existing: Option<&Collection>,
248 request: &RegisterCollectionRequest,
249) -> Option<String> {
250 match request.mode {
251 CollectionMode::Jsonb => None,
252 CollectionMode::Typed => Some(
253 request
254 .table_name
255 .clone()
256 .or_else(|| existing.and_then(existing_typed_table_name))
257 .unwrap_or_else(|| request.collection_name.clone()),
258 ),
259 }
260}
261
262fn existing_typed_table_name(collection: &Collection) -> Option<String> {
263 match collection.mode {
264 CollectionMode::Typed => collection.table_name.clone(),
265 CollectionMode::Jsonb => None,
266 }
267}
268
269fn validate_existing_collection_storage(
270 existing: Option<&Collection>,
271 request: &RegisterCollectionRequest,
272 table_name: Option<&str>,
273) -> Result<()> {
274 let Some(existing) = existing else {
275 return Ok(());
276 };
277
278 if existing.mode != request.mode {
279 return Err(CedrosDataError::InvalidRequest(format!(
280 "collection {} already exists as {}; changing mode to {} requires an explicit migration",
281 existing.collection_name,
282 existing.mode.as_str(),
283 request.mode.as_str(),
284 )));
285 }
286
287 if request.mode == CollectionMode::Typed {
288 let existing_table_name = existing.table_name.as_deref().ok_or_else(|| {
289 CedrosDataError::InvalidRequest(format!(
290 "typed collection {} is missing table_name",
291 existing.collection_name
292 ))
293 })?;
294 let requested_table_name = table_name.ok_or_else(|| {
295 CedrosDataError::InvalidRequest(format!(
296 "typed collection {} requires table_name",
297 request.collection_name
298 ))
299 })?;
300
301 if existing_table_name != requested_table_name {
302 return Err(CedrosDataError::InvalidRequest(format!(
303 "collection {} already uses typed table {}; changing it to {} requires an explicit migration",
304 existing.collection_name, existing_table_name, requested_table_name
305 )));
306 }
307 }
308
309 Ok(())
310}
311
312#[cfg(test)]
313mod tests {
314 use crate::error::CedrosDataError;
315 use crate::models::{
316 Collection, CollectionMode, ContractField, ContractSchema, RegisterCollectionRequest,
317 ValueType,
318 };
319
320 use super::{
321 optional_existing_collection, resolved_table_name, typed_table_index_statements,
322 validate_existing_collection_storage, validate_reserved_collection_request,
323 };
324
325 #[test]
326 fn reserved_collections_reject_storage_reconfiguration() {
327 let typed_request = RegisterCollectionRequest {
328 collection_name: "pages".to_string(),
329 mode: CollectionMode::Typed,
330 table_name: Some("pages_rows".to_string()),
331 strict_contract: None,
332 };
333 let strict_contract_request = RegisterCollectionRequest {
334 collection_name: "navigation".to_string(),
335 mode: CollectionMode::Jsonb,
336 table_name: None,
337 strict_contract: Some(ContractSchema {
338 fields: vec![ContractField {
339 path: "title".to_string(),
340 required: true,
341 types: vec![ValueType::String],
342 }],
343 }),
344 };
345
346 assert!(validate_reserved_collection_request(&typed_request).is_err());
347 assert!(validate_reserved_collection_request(&strict_contract_request).is_err());
348 }
349
350 #[test]
351 fn optional_existing_collection_treats_collection_not_found_as_absent() {
352 let result = optional_existing_collection(Err(CedrosDataError::CollectionNotFound(
353 "posts".to_string(),
354 )))
355 .expect("collection not found should map to None");
356
357 assert!(result.is_none());
358 }
359
360 #[test]
361 fn optional_existing_collection_preserves_other_errors() {
362 let error = optional_existing_collection(Err(CedrosDataError::InvalidRequest(
363 "bad row".to_string(),
364 )))
365 .expect_err("non-not-found errors must propagate");
366
367 assert!(matches!(error, CedrosDataError::InvalidRequest(message) if message == "bad row"));
368 }
369
370 #[test]
371 fn reserved_collections_accept_canonical_jsonb_configuration() {
372 let request = RegisterCollectionRequest {
373 collection_name: "site_settings".to_string(),
374 mode: CollectionMode::Jsonb,
375 table_name: None,
376 strict_contract: None,
377 };
378
379 assert!(validate_reserved_collection_request(&request).is_ok());
380 }
381
382 #[test]
383 fn existing_collection_cannot_change_storage_mode() {
384 let existing = sample_collection(CollectionMode::Jsonb, None);
385 let request = RegisterCollectionRequest {
386 collection_name: "articles".to_string(),
387 mode: CollectionMode::Typed,
388 table_name: Some("articles_rows".to_string()),
389 strict_contract: None,
390 };
391
392 let error =
393 validate_existing_collection_storage(Some(&existing), &request, Some("articles_rows"))
394 .expect_err("mode change should be rejected");
395
396 assert!(error
397 .to_string()
398 .contains("changing mode to typed requires an explicit migration"));
399 }
400
401 #[test]
402 fn existing_typed_collection_cannot_change_table_name() {
403 let existing = sample_collection(CollectionMode::Typed, Some("articles_rows"));
404 let request = RegisterCollectionRequest {
405 collection_name: "articles".to_string(),
406 mode: CollectionMode::Typed,
407 table_name: Some("articles_archive".to_string()),
408 strict_contract: None,
409 };
410
411 let error = validate_existing_collection_storage(
412 Some(&existing),
413 &request,
414 Some("articles_archive"),
415 )
416 .expect_err("table rename should be rejected");
417
418 assert!(error
419 .to_string()
420 .contains("changing it to articles_archive requires an explicit migration"));
421 }
422
423 #[test]
424 fn existing_typed_collection_reuses_current_table_name_when_omitted() {
425 let existing = sample_collection(CollectionMode::Typed, Some("articles_rows"));
426 let request = RegisterCollectionRequest {
427 collection_name: "articles".to_string(),
428 mode: CollectionMode::Typed,
429 table_name: None,
430 strict_contract: None,
431 };
432
433 let table_name = resolved_table_name(Some(&existing), &request);
434 assert_eq!(table_name.as_deref(), Some("articles_rows"));
435 assert!(validate_existing_collection_storage(
436 Some(&existing),
437 &request,
438 table_name.as_deref()
439 )
440 .is_ok());
441 }
442
443 #[test]
444 fn typed_table_index_statements_cover_payload_and_pagination_queries() {
445 let statements = typed_table_index_statements("articles_rows")
446 .expect("typed table index SQL should be generated");
447
448 assert_eq!(statements.len(), 2);
449 assert!(statements[0].contains("USING GIN (payload)"));
450 assert!(statements[1].contains("(updated_at DESC, entry_key ASC)"));
451 assert!(statements
452 .iter()
453 .all(|statement| statement.contains("\"site_data\".\"articles_rows\"")));
454 }
455
456 fn sample_collection(mode: CollectionMode, table_name: Option<&str>) -> Collection {
457 Collection {
458 collection_name: "articles".to_string(),
459 mode,
460 table_name: table_name.map(str::to_string),
461 strict_contract: None,
462 }
463 }
464}