Skip to main content

cedros_data/store/
site.rs

1use sqlx::{Postgres, Transaction};
2
3use crate::defaults::SITE_SCHEMA_NAME;
4use crate::error::{CedrosDataError, Result};
5use crate::models::{
6    Collection, CollectionMode, RegisterCollectionRequest, RegisterSiteRequest, Site,
7};
8use crate::sql::{generated_name, quote_ident, validate_identifier};
9
10use super::{
11    find_collection_with, insert_contract_in_transaction, load_site_if_configured_with,
12    map_collection_row, map_site_row, CedrosData,
13};
14
15impl CedrosData {
16    pub async fn register_site(&self, request: RegisterSiteRequest) -> Result<Site> {
17        if request.display_name.trim().is_empty() {
18            return Err(CedrosDataError::InvalidRequest(
19                "display_name cannot be empty".to_string(),
20            ));
21        }
22
23        let mut tx = self.pool().begin().await?;
24        let site = upsert_site_record(&mut tx, &request).await?;
25        create_site_schema(&mut tx).await?;
26        self.bootstrap_defaults_in_transaction(&mut tx).await?;
27        tx.commit().await?;
28        Ok(site)
29    }
30
31    pub async fn register_collection(
32        &self,
33        request: RegisterCollectionRequest,
34    ) -> Result<Collection> {
35        self.ensure_site_exists().await?;
36        let mut tx = self.pool().begin().await?;
37        let collection = self
38            .register_collection_in_transaction(&mut tx, request, true)
39            .await?;
40        tx.commit().await?;
41        Ok(collection)
42    }
43
44    pub(super) async fn register_collection_in_transaction(
45        &self,
46        tx: &mut Transaction<'_, Postgres>,
47        request: RegisterCollectionRequest,
48        persist_contract_history: bool,
49    ) -> Result<Collection> {
50        if request.collection_name.trim().is_empty() {
51            return Err(CedrosDataError::InvalidRequest(
52                "collection_name cannot be empty".to_string(),
53            ));
54        }
55
56        let existing = optional_existing_collection(
57            find_collection_with(&mut **tx, &request.collection_name).await,
58        )?;
59        validate_reserved_collection_request(&request)?;
60
61        let table_name = resolved_table_name(existing.as_ref(), &request);
62        validate_existing_collection_storage(existing.as_ref(), &request, table_name.as_deref())?;
63
64        if let Some(name) = &table_name {
65            validate_identifier(name)?;
66        }
67
68        let row = sqlx::query(
69            "INSERT INTO collections (collection_name, mode, table_name, strict_contract)
70             VALUES ($1, $2, $3, $4)
71             ON CONFLICT (collection_name)
72             DO UPDATE SET
73                 mode = EXCLUDED.mode,
74                 table_name = EXCLUDED.table_name,
75                 strict_contract = EXCLUDED.strict_contract,
76                 updated_at = NOW()
77             RETURNING collection_name, mode, table_name, strict_contract",
78        )
79        .bind(&request.collection_name)
80        .bind(request.mode.as_str())
81        .bind(&table_name)
82        .bind(
83            request
84                .strict_contract
85                .as_ref()
86                .map(serde_json::to_value)
87                .transpose()?,
88        )
89        .fetch_one(&mut **tx)
90        .await?;
91
92        let collection = map_collection_row(row)?;
93        if let CollectionMode::Typed = collection.mode {
94            create_typed_table(tx, collection.table_name.as_deref()).await?;
95            let table_name = collection.table_name.as_deref().ok_or_else(|| {
96                CedrosDataError::InvalidRequest("typed collections require table_name".to_string())
97            })?;
98            self.invalidate_typed_table_columns_cache(SITE_SCHEMA_NAME, table_name)
99                .await;
100            self.validate_typed_table_compatibility_in_transaction(
101                tx,
102                SITE_SCHEMA_NAME,
103                table_name,
104            )
105            .await?;
106        }
107
108        if persist_contract_history {
109            if let Some(contract) = collection.strict_contract.as_ref() {
110                insert_contract_in_transaction(tx, &collection.collection_name, contract).await?;
111            }
112        }
113
114        Ok(collection)
115    }
116
117    pub async fn list_collections(&self) -> Result<Vec<Collection>> {
118        self.ensure_site_exists().await?;
119        let rows = sqlx::query(
120            "SELECT collection_name, mode, table_name, strict_contract
121             FROM collections
122             ORDER BY collection_name",
123        )
124        .fetch_all(self.pool())
125        .await?;
126
127        rows.into_iter().map(map_collection_row).collect()
128    }
129}
130
131fn optional_existing_collection(result: Result<Collection>) -> Result<Option<Collection>> {
132    match result {
133        Ok(collection) => Ok(Some(collection)),
134        Err(CedrosDataError::CollectionNotFound(_)) => Ok(None),
135        Err(error) => Err(error),
136    }
137}
138
139pub(super) async fn upsert_site_record(
140    tx: &mut Transaction<'_, Postgres>,
141    request: &RegisterSiteRequest,
142) -> Result<Site> {
143    if load_site_if_configured_with(&mut **tx).await?.is_none() {
144        create_site_schema(tx).await?;
145    }
146
147    let row = sqlx::query(
148        "INSERT INTO site (id, display_name, metadata)
149         VALUES (1, $1, $2)
150         ON CONFLICT (id)
151         DO UPDATE SET
152             display_name = EXCLUDED.display_name,
153             metadata = EXCLUDED.metadata,
154             updated_at = NOW()
155         RETURNING display_name, metadata",
156    )
157    .bind(&request.display_name)
158    .bind(&request.metadata)
159    .fetch_one(&mut **tx)
160    .await?;
161
162    map_site_row(row)
163}
164
165pub(super) async fn create_site_schema(tx: &mut Transaction<'_, Postgres>) -> Result<()> {
166    let statement = format!(
167        "CREATE SCHEMA IF NOT EXISTS {}",
168        quote_ident(SITE_SCHEMA_NAME)?
169    );
170    sqlx::query(&statement).execute(&mut **tx).await?;
171    Ok(())
172}
173
174pub(super) async fn create_typed_table(
175    tx: &mut Transaction<'_, Postgres>,
176    table_name: Option<&str>,
177) -> Result<()> {
178    let table_name = table_name.ok_or_else(|| {
179        CedrosDataError::InvalidRequest("typed collections require table_name".to_string())
180    })?;
181
182    let statement = format!(
183        "CREATE TABLE IF NOT EXISTS {}.{} (
184            entry_key TEXT PRIMARY KEY,
185            payload JSONB NOT NULL DEFAULT '{{}}'::jsonb,
186            created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
187            updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
188         )",
189        quote_ident(SITE_SCHEMA_NAME)?,
190        quote_ident(table_name)?
191    );
192
193    sqlx::query(&statement).execute(&mut **tx).await?;
194    for index_statement in typed_table_index_statements(table_name)? {
195        sqlx::query(&index_statement).execute(&mut **tx).await?;
196    }
197    Ok(())
198}
199
200fn typed_table_index_statements(table_name: &str) -> Result<Vec<String>> {
201    let quoted_schema = quote_ident(SITE_SCHEMA_NAME)?;
202    let quoted_table = quote_ident(table_name)?;
203    let payload_index_name = generated_name("idx", table_name, &["payload".to_string()]);
204    let pagination_index_name = generated_name(
205        "idx",
206        table_name,
207        &["updated_at".to_string(), "entry_key".to_string()],
208    );
209
210    Ok(vec![
211        format!(
212            "CREATE INDEX IF NOT EXISTS {} ON {}.{} USING GIN (payload)",
213            quote_ident(&payload_index_name)?,
214            quoted_schema,
215            quoted_table
216        ),
217        format!(
218            "CREATE INDEX IF NOT EXISTS {} ON {}.{} (updated_at DESC, entry_key ASC)",
219            quote_ident(&pagination_index_name)?,
220            quoted_schema,
221            quoted_table
222        ),
223    ])
224}
225
226fn validate_reserved_collection_request(request: &RegisterCollectionRequest) -> Result<()> {
227    if !is_reserved_collection(&request.collection_name) {
228        return Ok(());
229    }
230    if request.mode != CollectionMode::Jsonb
231        || request.table_name.is_some()
232        || request.strict_contract.is_some()
233    {
234        return Err(CedrosDataError::InvalidRequest(format!(
235            "reserved collection {} must remain a jsonb collection without table_name or strict_contract",
236            request.collection_name
237        )));
238    }
239    Ok(())
240}
241
242pub(super) fn is_reserved_collection(collection_name: &str) -> bool {
243    matches!(collection_name, "pages" | "navigation" | "site_settings")
244}
245
246fn resolved_table_name(
247    existing: Option<&Collection>,
248    request: &RegisterCollectionRequest,
249) -> Option<String> {
250    match request.mode {
251        CollectionMode::Jsonb => None,
252        CollectionMode::Typed => Some(
253            request
254                .table_name
255                .clone()
256                .or_else(|| existing.and_then(existing_typed_table_name))
257                .unwrap_or_else(|| request.collection_name.clone()),
258        ),
259    }
260}
261
262fn existing_typed_table_name(collection: &Collection) -> Option<String> {
263    match collection.mode {
264        CollectionMode::Typed => collection.table_name.clone(),
265        CollectionMode::Jsonb => None,
266    }
267}
268
269fn validate_existing_collection_storage(
270    existing: Option<&Collection>,
271    request: &RegisterCollectionRequest,
272    table_name: Option<&str>,
273) -> Result<()> {
274    let Some(existing) = existing else {
275        return Ok(());
276    };
277
278    if existing.mode != request.mode {
279        return Err(CedrosDataError::InvalidRequest(format!(
280            "collection {} already exists as {}; changing mode to {} requires an explicit migration",
281            existing.collection_name,
282            existing.mode.as_str(),
283            request.mode.as_str(),
284        )));
285    }
286
287    if request.mode == CollectionMode::Typed {
288        let existing_table_name = existing.table_name.as_deref().ok_or_else(|| {
289            CedrosDataError::InvalidRequest(format!(
290                "typed collection {} is missing table_name",
291                existing.collection_name
292            ))
293        })?;
294        let requested_table_name = table_name.ok_or_else(|| {
295            CedrosDataError::InvalidRequest(format!(
296                "typed collection {} requires table_name",
297                request.collection_name
298            ))
299        })?;
300
301        if existing_table_name != requested_table_name {
302            return Err(CedrosDataError::InvalidRequest(format!(
303                "collection {} already uses typed table {}; changing it to {} requires an explicit migration",
304                existing.collection_name, existing_table_name, requested_table_name
305            )));
306        }
307    }
308
309    Ok(())
310}
311
312#[cfg(test)]
313mod tests {
314    use crate::error::CedrosDataError;
315    use crate::models::{
316        Collection, CollectionMode, ContractField, ContractSchema, RegisterCollectionRequest,
317        ValueType,
318    };
319
320    use super::{
321        optional_existing_collection, resolved_table_name, typed_table_index_statements,
322        validate_existing_collection_storage, validate_reserved_collection_request,
323    };
324
325    #[test]
326    fn reserved_collections_reject_storage_reconfiguration() {
327        let typed_request = RegisterCollectionRequest {
328            collection_name: "pages".to_string(),
329            mode: CollectionMode::Typed,
330            table_name: Some("pages_rows".to_string()),
331            strict_contract: None,
332        };
333        let strict_contract_request = RegisterCollectionRequest {
334            collection_name: "navigation".to_string(),
335            mode: CollectionMode::Jsonb,
336            table_name: None,
337            strict_contract: Some(ContractSchema {
338                fields: vec![ContractField {
339                    path: "title".to_string(),
340                    required: true,
341                    types: vec![ValueType::String],
342                }],
343            }),
344        };
345
346        assert!(validate_reserved_collection_request(&typed_request).is_err());
347        assert!(validate_reserved_collection_request(&strict_contract_request).is_err());
348    }
349
350    #[test]
351    fn optional_existing_collection_treats_collection_not_found_as_absent() {
352        let result = optional_existing_collection(Err(CedrosDataError::CollectionNotFound(
353            "posts".to_string(),
354        )))
355        .expect("collection not found should map to None");
356
357        assert!(result.is_none());
358    }
359
360    #[test]
361    fn optional_existing_collection_preserves_other_errors() {
362        let error = optional_existing_collection(Err(CedrosDataError::InvalidRequest(
363            "bad row".to_string(),
364        )))
365        .expect_err("non-not-found errors must propagate");
366
367        assert!(matches!(error, CedrosDataError::InvalidRequest(message) if message == "bad row"));
368    }
369
370    #[test]
371    fn reserved_collections_accept_canonical_jsonb_configuration() {
372        let request = RegisterCollectionRequest {
373            collection_name: "site_settings".to_string(),
374            mode: CollectionMode::Jsonb,
375            table_name: None,
376            strict_contract: None,
377        };
378
379        assert!(validate_reserved_collection_request(&request).is_ok());
380    }
381
382    #[test]
383    fn existing_collection_cannot_change_storage_mode() {
384        let existing = sample_collection(CollectionMode::Jsonb, None);
385        let request = RegisterCollectionRequest {
386            collection_name: "articles".to_string(),
387            mode: CollectionMode::Typed,
388            table_name: Some("articles_rows".to_string()),
389            strict_contract: None,
390        };
391
392        let error =
393            validate_existing_collection_storage(Some(&existing), &request, Some("articles_rows"))
394                .expect_err("mode change should be rejected");
395
396        assert!(error
397            .to_string()
398            .contains("changing mode to typed requires an explicit migration"));
399    }
400
401    #[test]
402    fn existing_typed_collection_cannot_change_table_name() {
403        let existing = sample_collection(CollectionMode::Typed, Some("articles_rows"));
404        let request = RegisterCollectionRequest {
405            collection_name: "articles".to_string(),
406            mode: CollectionMode::Typed,
407            table_name: Some("articles_archive".to_string()),
408            strict_contract: None,
409        };
410
411        let error = validate_existing_collection_storage(
412            Some(&existing),
413            &request,
414            Some("articles_archive"),
415        )
416        .expect_err("table rename should be rejected");
417
418        assert!(error
419            .to_string()
420            .contains("changing it to articles_archive requires an explicit migration"));
421    }
422
423    #[test]
424    fn existing_typed_collection_reuses_current_table_name_when_omitted() {
425        let existing = sample_collection(CollectionMode::Typed, Some("articles_rows"));
426        let request = RegisterCollectionRequest {
427            collection_name: "articles".to_string(),
428            mode: CollectionMode::Typed,
429            table_name: None,
430            strict_contract: None,
431        };
432
433        let table_name = resolved_table_name(Some(&existing), &request);
434        assert_eq!(table_name.as_deref(), Some("articles_rows"));
435        assert!(validate_existing_collection_storage(
436            Some(&existing),
437            &request,
438            table_name.as_deref()
439        )
440        .is_ok());
441    }
442
443    #[test]
444    fn typed_table_index_statements_cover_payload_and_pagination_queries() {
445        let statements = typed_table_index_statements("articles_rows")
446            .expect("typed table index SQL should be generated");
447
448        assert_eq!(statements.len(), 2);
449        assert!(statements[0].contains("USING GIN (payload)"));
450        assert!(statements[1].contains("(updated_at DESC, entry_key ASC)"));
451        assert!(statements
452            .iter()
453            .all(|statement| statement.contains("\"site_data\".\"articles_rows\"")));
454    }
455
456    fn sample_collection(mode: CollectionMode, table_name: Option<&str>) -> Collection {
457        Collection {
458            collection_name: "articles".to_string(),
459            mode,
460            table_name: table_name.map(str::to_string),
461            strict_contract: None,
462        }
463    }
464}