Skip to main content

cedros_data/store/
imex.rs

1use std::collections::BTreeMap;
2
3use futures_util::TryStreamExt;
4use sqlx::Row;
5
6use crate::defaults::SITE_SCHEMA_NAME;
7use crate::error::{CedrosDataError, Result};
8use crate::models::{
9    Collection, CollectionEntriesExport, EntryRecord, ExportSiteRequest, ImportResult,
10    ImportSiteRequest, RegisterCollectionRequest, RegisterSiteRequest, SiteExport,
11    UpsertEntryRequest,
12};
13use crate::sql::quote_ident;
14
15use super::entries::merged_typed_payload;
16use super::site::{create_site_schema, upsert_site_record};
17use super::{insert_contract_version_in_transaction, map_collection_row, CedrosData};
18
19const MAX_SITE_EXPORT_ENTRIES: usize = 10_000;
20
21impl CedrosData {
22    pub async fn export_site(&self, request: ExportSiteRequest) -> Result<SiteExport> {
23        let _ = request;
24        let site = self.current_site().await?;
25
26        let collections = self.export_collections().await?;
27        let contracts = self.export_contracts().await?;
28        let custom_schema = self.export_custom_schema().await?;
29        let entries = self.export_entries(&collections).await?;
30
31        Ok(SiteExport {
32            site,
33            collections,
34            custom_schema,
35            contracts,
36            entries,
37        })
38    }
39
40    pub async fn import_site(&self, request: ImportSiteRequest) -> Result<ImportResult> {
41        let mut tx = self.pool().begin().await?;
42        upsert_site_record(
43            &mut tx,
44            &RegisterSiteRequest {
45                display_name: request.export.site.display_name.clone(),
46                metadata: request.export.site.metadata.clone(),
47            },
48        )
49        .await?;
50        create_site_schema(&mut tx).await?;
51        self.bootstrap_defaults_in_transaction(&mut tx).await?;
52
53        if let Some(definition) = request.export.custom_schema.as_ref() {
54            let report = self
55                .register_custom_schema_in_transaction(&mut tx, definition)
56                .await?;
57            if !report.applied {
58                return Err(CedrosDataError::BreakingSchemaChange(
59                    report.breaking_changes.join(", "),
60                ));
61            }
62        }
63
64        let mut collections_by_name = BTreeMap::new();
65
66        let mut collections_imported = 0;
67        for collection in &request.export.collections {
68            let registered = self
69                .register_collection_in_transaction(
70                    &mut tx,
71                    collection_import_request(collection, request.overwrite_contracts),
72                    false,
73                )
74                .await?;
75            collections_by_name.insert(registered.collection_name.clone(), registered);
76            collections_imported += 1;
77        }
78
79        let contracts_imported =
80            import_contracts_in_transaction(&mut tx, &request, request.overwrite_contracts).await?;
81
82        let mut entries_imported = 0;
83        for group in &request.export.entries {
84            let collection = collection_for_import(&collections_by_name, &group.collection_name)?;
85            for entry in &group.entries {
86                self.upsert_entry_for_collection_in_transaction(
87                    &mut tx,
88                    collection,
89                    &import_entry_request(&group.collection_name, entry),
90                )
91                .await?;
92                entries_imported += 1;
93            }
94        }
95
96        tx.commit().await?;
97
98        Ok(ImportResult {
99            collections_imported,
100            entries_imported,
101            contracts_imported,
102        })
103    }
104
105    async fn export_entries(
106        &self,
107        collections: &[Collection],
108    ) -> Result<Vec<CollectionEntriesExport>> {
109        let mut rows = sqlx::query(
110            "SELECT collection_name, entry_key, payload, updated_at
111             FROM cedros_data.entries
112             ORDER BY collection_name, updated_at DESC, entry_key ASC",
113        )
114        .fetch(self.pool());
115        let mut grouped_entries = BTreeMap::new();
116        let mut exported_entries = 0usize;
117
118        while let Some(row) = rows.try_next().await? {
119            let entry_row = map_export_entry_row(row)?;
120            grouped_entries
121                .entry(entry_row.collection_name)
122                .or_insert_with(Vec::new)
123                .push(entry_row.entry);
124            exported_entries += 1;
125            if exported_entries > MAX_SITE_EXPORT_ENTRIES {
126                return Err(CedrosDataError::InvalidRequest(format!(
127                    "site export exceeds the {MAX_SITE_EXPORT_ENTRIES} entry limit"
128                )));
129            }
130        }
131        refresh_typed_export_groups(self.pool(), collections, &mut grouped_entries).await?;
132
133        Ok(build_collection_entries_export(
134            collections,
135            grouped_entries,
136        ))
137    }
138
139    async fn export_collections(&self) -> Result<Vec<Collection>> {
140        let rows = sqlx::query(
141            "SELECT collection_name, mode, table_name, strict_contract
142             FROM cedros_data.collections
143             ORDER BY collection_name",
144        )
145        .fetch_all(self.pool())
146        .await?;
147
148        rows.into_iter().map(map_collection_row).collect()
149    }
150
151    async fn export_contracts(&self) -> Result<Vec<crate::models::CollectionContractRecord>> {
152        let rows = sqlx::query(
153            "SELECT collection_name, version, contract
154             FROM cedros_data.collection_contracts
155             ORDER BY collection_name, version",
156        )
157        .fetch_all(self.pool())
158        .await?;
159
160        rows.into_iter().map(map_contract_row).collect()
161    }
162
163    async fn export_custom_schema(&self) -> Result<Option<crate::models::CustomSchemaDefinition>> {
164        let row = sqlx::query(
165            "SELECT definition
166             FROM cedros_data.custom_schema_state
167             WHERE id = 1",
168        )
169        .fetch_optional(self.pool())
170        .await?;
171
172        row.map(|row| row.get::<serde_json::Value, _>("definition"))
173            .map(serde_json::from_value)
174            .transpose()
175            .map_err(Into::into)
176    }
177}
178
179async fn import_contracts_in_transaction(
180    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
181    request: &ImportSiteRequest,
182    overwrite: bool,
183) -> Result<usize> {
184    if !overwrite {
185        return Ok(0);
186    }
187
188    let mut imported = 0;
189
190    for record in &request.export.contracts {
191        insert_contract_version_in_transaction(
192            tx,
193            &record.collection_name,
194            record.version,
195            &record.contract,
196        )
197        .await?;
198        imported += 1;
199    }
200
201    Ok(imported)
202}
203
204fn collection_for_import<'a>(
205    collections_by_name: &'a BTreeMap<String, Collection>,
206    collection_name: &str,
207) -> Result<&'a Collection> {
208    collections_by_name
209        .get(collection_name)
210        .ok_or_else(|| CedrosDataError::CollectionNotFound(collection_name.to_string()))
211}
212
213fn collection_import_request(
214    collection: &Collection,
215    overwrite_contracts: bool,
216) -> RegisterCollectionRequest {
217    RegisterCollectionRequest {
218        collection_name: collection.collection_name.clone(),
219        mode: collection.mode.clone(),
220        table_name: collection.table_name.clone(),
221        strict_contract: if overwrite_contracts {
222            collection.strict_contract.clone()
223        } else {
224            None
225        },
226    }
227}
228
229fn import_entry_request(collection_name: &str, entry: &EntryRecord) -> UpsertEntryRequest {
230    UpsertEntryRequest {
231        collection_name: collection_name.to_string(),
232        entry_key: entry.entry_key.clone(),
233        payload: entry.payload.clone(),
234    }
235}
236
237fn map_contract_row(row: sqlx::postgres::PgRow) -> Result<crate::models::CollectionContractRecord> {
238    let contract_value = row.get::<serde_json::Value, _>("contract");
239    Ok(crate::models::CollectionContractRecord {
240        collection_name: row.get("collection_name"),
241        version: row.get("version"),
242        contract: serde_json::from_value(contract_value)?,
243    })
244}
245
246#[derive(Debug)]
247struct ExportEntryRow {
248    collection_name: String,
249    entry: EntryRecord,
250}
251
252fn map_export_entry_row(row: sqlx::postgres::PgRow) -> Result<ExportEntryRow> {
253    Ok(ExportEntryRow {
254        collection_name: row.get("collection_name"),
255        entry: EntryRecord {
256            entry_key: row.get("entry_key"),
257            payload: row.get("payload"),
258            updated_at: row.get("updated_at"),
259        },
260    })
261}
262
263async fn refresh_typed_export_groups(
264    pool: &sqlx::PgPool,
265    collections: &[Collection],
266    grouped_entries: &mut BTreeMap<String, Vec<EntryRecord>>,
267) -> Result<()> {
268    for collection in collections {
269        if collection.mode != crate::models::CollectionMode::Typed {
270            continue;
271        }
272        grouped_entries.insert(
273            collection.collection_name.clone(),
274            export_typed_collection_entries(pool, collection).await?,
275        );
276    }
277
278    Ok(())
279}
280
281async fn export_typed_collection_entries(
282    pool: &sqlx::PgPool,
283    collection: &Collection,
284) -> Result<Vec<EntryRecord>> {
285    let table_name = collection.table_name.as_deref().ok_or_else(|| {
286        CedrosDataError::InvalidRequest(format!(
287            "typed collection {} is missing table_name",
288            collection.collection_name
289        ))
290    })?;
291    let statement = format!(
292        "SELECT entry_key, payload, to_jsonb(typed_entries) AS typed_row, updated_at
293         FROM {}.{} AS typed_entries
294         ORDER BY updated_at DESC, entry_key ASC",
295        quote_ident(SITE_SCHEMA_NAME)?,
296        quote_ident(table_name)?
297    );
298    let rows = sqlx::query(&statement).fetch_all(pool).await?;
299
300    Ok(rows
301        .into_iter()
302        .map(|row| EntryRecord {
303            entry_key: row.get("entry_key"),
304            payload: merged_typed_payload(row.get("payload"), row.get("typed_row")),
305            updated_at: row.get("updated_at"),
306        })
307        .collect())
308}
309
310fn build_collection_entries_export(
311    collections: &[Collection],
312    mut grouped_entries: BTreeMap<String, Vec<EntryRecord>>,
313) -> Vec<CollectionEntriesExport> {
314    collections
315        .iter()
316        .map(|collection| CollectionEntriesExport {
317            collection_name: collection.collection_name.clone(),
318            entries: grouped_entries
319                .remove(&collection.collection_name)
320                .unwrap_or_default(),
321        })
322        .collect()
323}
324
325#[cfg(test)]
326mod tests {
327    use std::collections::BTreeMap;
328
329    use serde_json::json;
330
331    use super::build_collection_entries_export;
332    use crate::models::{
333        Collection, CollectionMode, ContractField, ContractSchema, EntryRecord, ValueType,
334    };
335
336    #[test]
337    fn overwrite_contracts_controls_imported_strict_contract() {
338        let collection = Collection {
339            collection_name: "articles".to_string(),
340            mode: CollectionMode::Jsonb,
341            table_name: None,
342            strict_contract: Some(ContractSchema {
343                fields: vec![ContractField {
344                    path: "title".to_string(),
345                    required: true,
346                    types: vec![ValueType::String],
347                }],
348            }),
349        };
350
351        let preserve = super::collection_import_request(&collection, false);
352        let overwrite = super::collection_import_request(&collection, true);
353
354        assert!(preserve.strict_contract.is_none());
355        assert!(overwrite.strict_contract.is_some());
356    }
357
358    #[test]
359    fn export_groups_follow_collection_order_and_preserve_empty_sets() {
360        let collections = vec![
361            collection("articles"),
362            collection("authors"),
363            collection("pages"),
364        ];
365        let grouped = build_collection_entries_export(&collections, {
366            let mut grouped_entries = BTreeMap::new();
367            grouped_entries.insert("pages".to_string(), vec![entry_record("home")]);
368            grouped_entries.insert(
369                "articles".to_string(),
370                vec![entry_record("second"), entry_record("first")],
371            );
372            grouped_entries
373        });
374
375        assert_eq!(
376            grouped
377                .iter()
378                .map(|group| group.collection_name.as_str())
379                .collect::<Vec<_>>(),
380            vec!["articles", "authors", "pages"]
381        );
382        assert_eq!(grouped[0].entries.len(), 2);
383        assert!(grouped[1].entries.is_empty());
384        assert_eq!(grouped[2].entries[0].entry_key, "home");
385    }
386
387    fn collection(collection_name: &str) -> Collection {
388        Collection {
389            collection_name: collection_name.to_string(),
390            mode: CollectionMode::Jsonb,
391            table_name: None,
392            strict_contract: None,
393        }
394    }
395
396    fn entry_record(entry_key: &str) -> EntryRecord {
397        EntryRecord {
398            entry_key: entry_key.to_string(),
399            payload: json!({ "entry_key": entry_key }),
400            updated_at: chrono::Utc::now(),
401        }
402    }
403}