1use std::collections::BTreeMap;
2
3use futures_util::TryStreamExt;
4use sqlx::Row;
5
6use crate::defaults::SITE_SCHEMA_NAME;
7use crate::error::{CedrosDataError, Result};
8use crate::models::{
9 Collection, CollectionEntriesExport, EntryRecord, ExportSiteRequest, ImportResult,
10 ImportSiteRequest, RegisterCollectionRequest, RegisterSiteRequest, SiteExport,
11 UpsertEntryRequest,
12};
13use crate::sql::quote_ident;
14
15use super::entries::merged_typed_payload;
16use super::site::{create_site_schema, upsert_site_record};
17use super::{insert_contract_version_in_transaction, map_collection_row, CedrosData};
18
19const MAX_SITE_EXPORT_ENTRIES: usize = 10_000;
20
21impl CedrosData {
22 pub async fn export_site(&self, request: ExportSiteRequest) -> Result<SiteExport> {
23 let _ = request;
24 let site = self.current_site().await?;
25
26 let collections = self.export_collections().await?;
27 let contracts = self.export_contracts().await?;
28 let custom_schema = self.export_custom_schema().await?;
29 let entries = self.export_entries(&collections).await?;
30
31 Ok(SiteExport {
32 site,
33 collections,
34 custom_schema,
35 contracts,
36 entries,
37 })
38 }
39
40 pub async fn import_site(&self, request: ImportSiteRequest) -> Result<ImportResult> {
41 let mut tx = self.pool().begin().await?;
42 upsert_site_record(
43 &mut tx,
44 &RegisterSiteRequest {
45 display_name: request.export.site.display_name.clone(),
46 metadata: request.export.site.metadata.clone(),
47 },
48 )
49 .await?;
50 create_site_schema(&mut tx).await?;
51 self.bootstrap_defaults_in_transaction(&mut tx).await?;
52
53 if let Some(definition) = request.export.custom_schema.as_ref() {
54 let report = self
55 .register_custom_schema_in_transaction(&mut tx, definition)
56 .await?;
57 if !report.applied {
58 return Err(CedrosDataError::BreakingSchemaChange(
59 report.breaking_changes.join(", "),
60 ));
61 }
62 }
63
64 let mut collections_by_name = BTreeMap::new();
65
66 let mut collections_imported = 0;
67 for collection in &request.export.collections {
68 let registered = self
69 .register_collection_in_transaction(
70 &mut tx,
71 collection_import_request(collection, request.overwrite_contracts),
72 false,
73 )
74 .await?;
75 collections_by_name.insert(registered.collection_name.clone(), registered);
76 collections_imported += 1;
77 }
78
79 let contracts_imported =
80 import_contracts_in_transaction(&mut tx, &request, request.overwrite_contracts).await?;
81
82 let mut entries_imported = 0;
83 for group in &request.export.entries {
84 let collection = collection_for_import(&collections_by_name, &group.collection_name)?;
85 for entry in &group.entries {
86 self.upsert_entry_for_collection_in_transaction(
87 &mut tx,
88 collection,
89 &import_entry_request(&group.collection_name, entry),
90 )
91 .await?;
92 entries_imported += 1;
93 }
94 }
95
96 tx.commit().await?;
97
98 Ok(ImportResult {
99 collections_imported,
100 entries_imported,
101 contracts_imported,
102 })
103 }
104
105 async fn export_entries(
106 &self,
107 collections: &[Collection],
108 ) -> Result<Vec<CollectionEntriesExport>> {
109 let mut rows = sqlx::query(
110 "SELECT collection_name, entry_key, payload, updated_at
111 FROM cedros_data.entries
112 ORDER BY collection_name, updated_at DESC, entry_key ASC",
113 )
114 .fetch(self.pool());
115 let mut grouped_entries = BTreeMap::new();
116 let mut exported_entries = 0usize;
117
118 while let Some(row) = rows.try_next().await? {
119 let entry_row = map_export_entry_row(row)?;
120 grouped_entries
121 .entry(entry_row.collection_name)
122 .or_insert_with(Vec::new)
123 .push(entry_row.entry);
124 exported_entries += 1;
125 if exported_entries > MAX_SITE_EXPORT_ENTRIES {
126 return Err(CedrosDataError::InvalidRequest(format!(
127 "site export exceeds the {MAX_SITE_EXPORT_ENTRIES} entry limit"
128 )));
129 }
130 }
131 refresh_typed_export_groups(self.pool(), collections, &mut grouped_entries).await?;
132
133 Ok(build_collection_entries_export(
134 collections,
135 grouped_entries,
136 ))
137 }
138
139 async fn export_collections(&self) -> Result<Vec<Collection>> {
140 let rows = sqlx::query(
141 "SELECT collection_name, mode, table_name, strict_contract
142 FROM cedros_data.collections
143 ORDER BY collection_name",
144 )
145 .fetch_all(self.pool())
146 .await?;
147
148 rows.into_iter().map(map_collection_row).collect()
149 }
150
151 async fn export_contracts(&self) -> Result<Vec<crate::models::CollectionContractRecord>> {
152 let rows = sqlx::query(
153 "SELECT collection_name, version, contract
154 FROM cedros_data.collection_contracts
155 ORDER BY collection_name, version",
156 )
157 .fetch_all(self.pool())
158 .await?;
159
160 rows.into_iter().map(map_contract_row).collect()
161 }
162
163 async fn export_custom_schema(&self) -> Result<Option<crate::models::CustomSchemaDefinition>> {
164 let row = sqlx::query(
165 "SELECT definition
166 FROM cedros_data.custom_schema_state
167 WHERE id = 1",
168 )
169 .fetch_optional(self.pool())
170 .await?;
171
172 row.map(|row| row.get::<serde_json::Value, _>("definition"))
173 .map(serde_json::from_value)
174 .transpose()
175 .map_err(Into::into)
176 }
177}
178
179async fn import_contracts_in_transaction(
180 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
181 request: &ImportSiteRequest,
182 overwrite: bool,
183) -> Result<usize> {
184 if !overwrite {
185 return Ok(0);
186 }
187
188 let mut imported = 0;
189
190 for record in &request.export.contracts {
191 insert_contract_version_in_transaction(
192 tx,
193 &record.collection_name,
194 record.version,
195 &record.contract,
196 )
197 .await?;
198 imported += 1;
199 }
200
201 Ok(imported)
202}
203
204fn collection_for_import<'a>(
205 collections_by_name: &'a BTreeMap<String, Collection>,
206 collection_name: &str,
207) -> Result<&'a Collection> {
208 collections_by_name
209 .get(collection_name)
210 .ok_or_else(|| CedrosDataError::CollectionNotFound(collection_name.to_string()))
211}
212
213fn collection_import_request(
214 collection: &Collection,
215 overwrite_contracts: bool,
216) -> RegisterCollectionRequest {
217 RegisterCollectionRequest {
218 collection_name: collection.collection_name.clone(),
219 mode: collection.mode.clone(),
220 table_name: collection.table_name.clone(),
221 strict_contract: if overwrite_contracts {
222 collection.strict_contract.clone()
223 } else {
224 None
225 },
226 }
227}
228
229fn import_entry_request(collection_name: &str, entry: &EntryRecord) -> UpsertEntryRequest {
230 UpsertEntryRequest {
231 collection_name: collection_name.to_string(),
232 entry_key: entry.entry_key.clone(),
233 payload: entry.payload.clone(),
234 }
235}
236
237fn map_contract_row(row: sqlx::postgres::PgRow) -> Result<crate::models::CollectionContractRecord> {
238 let contract_value = row.get::<serde_json::Value, _>("contract");
239 Ok(crate::models::CollectionContractRecord {
240 collection_name: row.get("collection_name"),
241 version: row.get("version"),
242 contract: serde_json::from_value(contract_value)?,
243 })
244}
245
246#[derive(Debug)]
247struct ExportEntryRow {
248 collection_name: String,
249 entry: EntryRecord,
250}
251
252fn map_export_entry_row(row: sqlx::postgres::PgRow) -> Result<ExportEntryRow> {
253 Ok(ExportEntryRow {
254 collection_name: row.get("collection_name"),
255 entry: EntryRecord {
256 entry_key: row.get("entry_key"),
257 payload: row.get("payload"),
258 updated_at: row.get("updated_at"),
259 },
260 })
261}
262
263async fn refresh_typed_export_groups(
264 pool: &sqlx::PgPool,
265 collections: &[Collection],
266 grouped_entries: &mut BTreeMap<String, Vec<EntryRecord>>,
267) -> Result<()> {
268 for collection in collections {
269 if collection.mode != crate::models::CollectionMode::Typed {
270 continue;
271 }
272 grouped_entries.insert(
273 collection.collection_name.clone(),
274 export_typed_collection_entries(pool, collection).await?,
275 );
276 }
277
278 Ok(())
279}
280
281async fn export_typed_collection_entries(
282 pool: &sqlx::PgPool,
283 collection: &Collection,
284) -> Result<Vec<EntryRecord>> {
285 let table_name = collection.table_name.as_deref().ok_or_else(|| {
286 CedrosDataError::InvalidRequest(format!(
287 "typed collection {} is missing table_name",
288 collection.collection_name
289 ))
290 })?;
291 let statement = format!(
292 "SELECT entry_key, payload, to_jsonb(typed_entries) AS typed_row, updated_at
293 FROM {}.{} AS typed_entries
294 ORDER BY updated_at DESC, entry_key ASC",
295 quote_ident(SITE_SCHEMA_NAME)?,
296 quote_ident(table_name)?
297 );
298 let rows = sqlx::query(&statement).fetch_all(pool).await?;
299
300 Ok(rows
301 .into_iter()
302 .map(|row| EntryRecord {
303 entry_key: row.get("entry_key"),
304 payload: merged_typed_payload(row.get("payload"), row.get("typed_row")),
305 updated_at: row.get("updated_at"),
306 })
307 .collect())
308}
309
310fn build_collection_entries_export(
311 collections: &[Collection],
312 mut grouped_entries: BTreeMap<String, Vec<EntryRecord>>,
313) -> Vec<CollectionEntriesExport> {
314 collections
315 .iter()
316 .map(|collection| CollectionEntriesExport {
317 collection_name: collection.collection_name.clone(),
318 entries: grouped_entries
319 .remove(&collection.collection_name)
320 .unwrap_or_default(),
321 })
322 .collect()
323}
324
325#[cfg(test)]
326mod tests {
327 use std::collections::BTreeMap;
328
329 use serde_json::json;
330
331 use super::build_collection_entries_export;
332 use crate::models::{
333 Collection, CollectionMode, ContractField, ContractSchema, EntryRecord, ValueType,
334 };
335
336 #[test]
337 fn overwrite_contracts_controls_imported_strict_contract() {
338 let collection = Collection {
339 collection_name: "articles".to_string(),
340 mode: CollectionMode::Jsonb,
341 table_name: None,
342 strict_contract: Some(ContractSchema {
343 fields: vec![ContractField {
344 path: "title".to_string(),
345 required: true,
346 types: vec![ValueType::String],
347 }],
348 }),
349 };
350
351 let preserve = super::collection_import_request(&collection, false);
352 let overwrite = super::collection_import_request(&collection, true);
353
354 assert!(preserve.strict_contract.is_none());
355 assert!(overwrite.strict_contract.is_some());
356 }
357
358 #[test]
359 fn export_groups_follow_collection_order_and_preserve_empty_sets() {
360 let collections = vec![
361 collection("articles"),
362 collection("authors"),
363 collection("pages"),
364 ];
365 let grouped = build_collection_entries_export(&collections, {
366 let mut grouped_entries = BTreeMap::new();
367 grouped_entries.insert("pages".to_string(), vec![entry_record("home")]);
368 grouped_entries.insert(
369 "articles".to_string(),
370 vec![entry_record("second"), entry_record("first")],
371 );
372 grouped_entries
373 });
374
375 assert_eq!(
376 grouped
377 .iter()
378 .map(|group| group.collection_name.as_str())
379 .collect::<Vec<_>>(),
380 vec!["articles", "authors", "pages"]
381 );
382 assert_eq!(grouped[0].entries.len(), 2);
383 assert!(grouped[1].entries.is_empty());
384 assert_eq!(grouped[2].entries[0].entry_key, "home");
385 }
386
387 fn collection(collection_name: &str) -> Collection {
388 Collection {
389 collection_name: collection_name.to_string(),
390 mode: CollectionMode::Jsonb,
391 table_name: None,
392 strict_contract: None,
393 }
394 }
395
396 fn entry_record(entry_key: &str) -> EntryRecord {
397 EntryRecord {
398 entry_key: entry_key.to_string(),
399 payload: json!({ "entry_key": entry_key }),
400 updated_at: chrono::Utc::now(),
401 }
402 }
403}