1use std::collections::{HashMap, HashSet};
8use std::fs;
9use std::path::{Path, PathBuf};
10use std::sync::Arc;
11
12use arrow::array::{BooleanBuilder, StringBuilder};
13use arrow::datatypes::{DataType, Field, Schema};
14use arrow::record_batch::RecordBatch;
15use chrono::Utc;
16use futures::StreamExt;
17use parquet::arrow::ArrowWriter;
18use parquet::basic::{Compression, ZstdLevel};
19use parquet::file::properties::WriterProperties;
20use serde::Serialize;
21use tracing::info;
22
23use crate::config::PortalsConfig;
24use crate::error::AppError;
25use crate::models::Dataset;
26use crate::traits::DatasetStore;
27
28pub struct ParquetExportConfig {
30 pub min_title_length: usize,
32 pub noise_patterns: Vec<String>,
34 pub batch_size: usize,
36}
37
38impl Default for ParquetExportConfig {
39 fn default() -> Self {
40 Self {
41 min_title_length: 5,
42 noise_patterns: vec!["test".into(), "prova".into(), "esempio".into()],
43 batch_size: 10_000,
44 }
45 }
46}
47
48#[derive(Debug, Serialize)]
50pub struct ParquetExportResult {
51 pub total_exported: u64,
52 pub total_filtered: u64,
53 pub total_duplicates: u64,
54 pub portals: Vec<PortalExportStats>,
55 pub snapshot_date: String,
56 pub output_dir: PathBuf,
57}
58
59#[derive(Debug, Serialize)]
61pub struct PortalExportStats {
62 pub name: String,
63 pub url: String,
64 pub count: u64,
65}
66
67struct FlatRecord {
69 original_id: String,
70 source_portal: String,
71 portal_name: String,
72 url: String,
73 title: String,
74 description: String,
75 tags: String,
76 organization: String,
77 license: String,
78 metadata_created: String,
79 metadata_modified: String,
80 first_seen_at: String,
81 language: String,
82 is_duplicate: bool,
83}
84
85pub struct ParquetExportService<S: DatasetStore> {
87 store: S,
88 config: ParquetExportConfig,
89 portal_names: HashMap<String, String>,
90 portal_languages: HashMap<String, String>,
91}
92
93impl<S: DatasetStore> ParquetExportService<S> {
94 pub fn new(
99 store: S,
100 portals_config: Option<PortalsConfig>,
101 config: ParquetExportConfig,
102 ) -> Self {
103 let mut portal_names = HashMap::new();
104 let mut portal_languages = HashMap::new();
105
106 if let Some(pc) = &portals_config {
107 for entry in &pc.portals {
108 let url = normalize_portal_url(&entry.url);
109 portal_names.insert(url.clone(), entry.name.clone());
110 portal_languages.insert(url, entry.language().to_string());
111 }
112 }
113
114 Self {
115 store,
116 config,
117 portal_names,
118 portal_languages,
119 }
120 }
121
122 pub async fn export_to_directory(
129 &self,
130 output_dir: &Path,
131 ) -> Result<ParquetExportResult, AppError> {
132 let data_dir = output_dir.join("data");
134 fs::create_dir_all(&data_dir).map_err(|e| {
135 AppError::Generic(format!(
136 "Failed to create output directory {}: {}",
137 data_dir.display(),
138 e
139 ))
140 })?;
141
142 info!("Loading cross-portal duplicate titles...");
143 let duplicate_titles = self.store.get_duplicate_titles().await?;
144 info!(
145 "Found {} duplicate title groups across portals",
146 duplicate_titles.len()
147 );
148
149 info!("Streaming datasets for Parquet export...");
150 let (exported, filtered, duplicates, portal_stats) =
151 self.stream_and_write(output_dir, &duplicate_titles).await?;
152
153 let snapshot_date = Utc::now().format("%Y-%m-%d").to_string();
154
155 let result = ParquetExportResult {
156 total_exported: exported,
157 total_filtered: filtered,
158 total_duplicates: duplicates,
159 portals: portal_stats,
160 snapshot_date: snapshot_date.clone(),
161 output_dir: output_dir.to_path_buf(),
162 };
163
164 let metadata_path = output_dir.join("metadata.json");
166 let metadata_json = serde_json::to_string_pretty(&result)
167 .map_err(|e| AppError::Generic(format!("Failed to serialize metadata: {}", e)))?;
168 fs::write(&metadata_path, metadata_json).map_err(|e| {
169 AppError::Generic(format!(
170 "Failed to write {}: {}",
171 metadata_path.display(),
172 e
173 ))
174 })?;
175
176 Ok(result)
177 }
178
179 async fn stream_and_write(
183 &self,
184 output_dir: &Path,
185 duplicate_titles: &HashSet<String>,
186 ) -> Result<(u64, u64, u64, Vec<PortalExportStats>), AppError> {
187 let schema = arrow_schema();
188 let writer_props = writer_properties();
189
190 let data_dir = output_dir.join("data");
191
192 let all_path = output_dir.join("all.parquet");
194 let all_file = fs::File::create(&all_path).map_err(|e| {
195 AppError::Generic(format!("Failed to create {}: {}", all_path.display(), e))
196 })?;
197 let mut all_writer =
198 ArrowWriter::try_new(all_file, schema.clone(), Some(writer_props.clone()))
199 .map_err(|e| AppError::Generic(format!("Failed to create ArrowWriter: {}", e)))?;
200
201 let mut portal_writers: HashMap<String, ArrowWriter<fs::File>> = HashMap::new();
203 let mut portal_buffers: HashMap<String, Vec<FlatRecord>> = HashMap::new();
204 let mut portal_counts: HashMap<String, u64> = HashMap::new();
205 let mut portal_info: HashMap<String, (String, String)> = HashMap::new();
207
208 let mut all_buffer: Vec<FlatRecord> = Vec::with_capacity(self.config.batch_size);
210
211 let mut total_exported = 0u64;
212 let mut total_filtered = 0u64;
213 let mut total_duplicates = 0u64;
214
215 let mut stream = self.store.list_stream(None, None);
216
217 while let Some(result) = stream.next().await {
218 let dataset = result?;
219
220 if self.is_noise(&dataset) {
222 total_filtered += 1;
223 continue;
224 }
225
226 let is_duplicate = duplicate_titles.contains(&dataset.title.to_lowercase());
227 if is_duplicate {
228 total_duplicates += 1;
229 }
230
231 let record = self.flatten_dataset(&dataset, is_duplicate);
232
233 let portal_key = normalize_portal_url(&record.source_portal);
235 let file_name = portal_file_name(&record.portal_name);
236 portal_info
237 .entry(portal_key.clone())
238 .or_insert_with(|| (record.portal_name.clone(), file_name));
239
240 portal_buffers
242 .entry(portal_key.clone())
243 .or_default()
244 .push(FlatRecord {
245 original_id: record.original_id.clone(),
246 source_portal: record.source_portal.clone(),
247 portal_name: record.portal_name.clone(),
248 url: record.url.clone(),
249 title: record.title.clone(),
250 description: record.description.clone(),
251 tags: record.tags.clone(),
252 organization: record.organization.clone(),
253 license: record.license.clone(),
254 metadata_created: record.metadata_created.clone(),
255 metadata_modified: record.metadata_modified.clone(),
256 first_seen_at: record.first_seen_at.clone(),
257 language: record.language.clone(),
258 is_duplicate: record.is_duplicate,
259 });
260
261 all_buffer.push(record);
263 total_exported += 1;
264 *portal_counts.entry(portal_key.clone()).or_default() += 1;
265
266 if all_buffer.len() >= self.config.batch_size {
268 let batch = build_record_batch(&all_buffer, &schema)?;
269 all_writer
270 .write(&batch)
271 .map_err(|e| AppError::Generic(format!("Parquet write error: {}", e)))?;
272 all_buffer.clear();
273 }
274
275 if let Some(buf) = portal_buffers.get(&portal_key)
277 && buf.len() >= self.config.batch_size
278 {
279 let buf = portal_buffers
280 .remove(&portal_key)
281 .expect("buffer must exist: checked by get() above");
282 let batch = build_record_batch(&buf, &schema)?;
283 let (_, ref fname) = portal_info[&portal_key];
284 let writer = get_or_create_portal_writer(
285 &mut portal_writers,
286 &portal_key,
287 fname,
288 &data_dir,
289 &schema,
290 &writer_props,
291 )?;
292 writer
293 .write(&batch)
294 .map_err(|e| AppError::Generic(format!("Parquet write error: {}", e)))?;
295 }
296 }
297
298 if !all_buffer.is_empty() {
300 let batch = build_record_batch(&all_buffer, &schema)?;
301 all_writer
302 .write(&batch)
303 .map_err(|e| AppError::Generic(format!("Parquet write error: {}", e)))?;
304 }
305
306 for (portal_key, buf) in portal_buffers.drain() {
308 if !buf.is_empty() {
309 let batch = build_record_batch(&buf, &schema)?;
310 let (_, ref fname) = portal_info[&portal_key];
311 let writer = get_or_create_portal_writer(
312 &mut portal_writers,
313 &portal_key,
314 fname,
315 &data_dir,
316 &schema,
317 &writer_props,
318 )?;
319 writer
320 .write(&batch)
321 .map_err(|e| AppError::Generic(format!("Parquet write error: {}", e)))?;
322 }
323 }
324
325 all_writer
327 .close()
328 .map_err(|e| AppError::Generic(format!("Failed to close all.parquet: {}", e)))?;
329
330 for (portal_key, writer) in portal_writers {
331 let (_, ref fname) = portal_info[&portal_key];
332 writer.close().map_err(|e| {
333 AppError::Generic(format!("Failed to close {}.parquet: {}", fname, e))
334 })?;
335 }
336
337 let mut portal_stats: Vec<PortalExportStats> = portal_counts
339 .into_iter()
340 .map(|(portal_key, count)| {
341 let (ref name, _) = portal_info[&portal_key];
342 PortalExportStats {
343 name: name.clone(),
344 url: portal_key,
345 count,
346 }
347 })
348 .collect();
349 portal_stats.sort_by(|a, b| b.count.cmp(&a.count));
350
351 Ok((
352 total_exported,
353 total_filtered,
354 total_duplicates,
355 portal_stats,
356 ))
357 }
358
359 fn is_noise(&self, dataset: &Dataset) -> bool {
361 if dataset.title.len() < self.config.min_title_length {
363 return true;
364 }
365
366 if dataset
368 .description
369 .as_ref()
370 .is_none_or(|d| d.trim().is_empty())
371 {
372 return true;
373 }
374
375 let title_lower = dataset.title.to_lowercase();
377 for pattern in &self.config.noise_patterns {
378 if title_lower.contains(pattern.as_str()) {
379 return true;
380 }
381 }
382
383 false
384 }
385
386 fn flatten_dataset(&self, dataset: &Dataset, is_duplicate: bool) -> FlatRecord {
388 let metadata = &dataset.metadata;
389 let normalized_url = normalize_portal_url(&dataset.source_portal);
390
391 let portal_name = self
392 .portal_names
393 .get(&normalized_url)
394 .cloned()
395 .unwrap_or_else(|| portal_name_from_url(&dataset.source_portal));
396
397 let language = self
398 .portal_languages
399 .get(&normalized_url)
400 .cloned()
401 .or_else(|| {
402 metadata
403 .get("language")
404 .and_then(|v| v.as_str())
405 .map(|s| s.to_lowercase())
406 })
407 .unwrap_or_else(|| "unknown".to_string());
408
409 FlatRecord {
410 original_id: dataset.original_id.clone(),
411 source_portal: dataset.source_portal.clone(),
412 portal_name,
413 url: dataset.url.clone(),
414 title: dataset.title.clone(),
415 description: dataset.description.clone().unwrap_or_default(),
416 tags: extract_tags(metadata),
417 organization: extract_organization(metadata),
418 license: extract_license(metadata),
419 metadata_created: extract_string(metadata, "metadata_created"),
420 metadata_modified: extract_string(metadata, "metadata_modified"),
421 first_seen_at: dataset.first_seen_at.to_rfc3339(),
422 language,
423 is_duplicate,
424 }
425 }
426}
427
428fn arrow_schema() -> Arc<Schema> {
434 Arc::new(Schema::new(vec![
435 Field::new("original_id", DataType::Utf8, false),
436 Field::new("source_portal", DataType::Utf8, false),
437 Field::new("portal_name", DataType::Utf8, false),
438 Field::new("url", DataType::Utf8, false),
439 Field::new("title", DataType::Utf8, false),
440 Field::new("description", DataType::Utf8, true),
441 Field::new("tags", DataType::Utf8, true),
442 Field::new("organization", DataType::Utf8, true),
443 Field::new("license", DataType::Utf8, true),
444 Field::new("metadata_created", DataType::Utf8, true),
445 Field::new("metadata_modified", DataType::Utf8, true),
446 Field::new("first_seen_at", DataType::Utf8, false),
447 Field::new("language", DataType::Utf8, true),
448 Field::new("is_duplicate", DataType::Boolean, false),
449 ]))
450}
451
452fn writer_properties() -> WriterProperties {
454 WriterProperties::builder()
455 .set_compression(Compression::ZSTD(
456 ZstdLevel::try_new(3).expect("zstd level 3 should be valid"),
457 ))
458 .build()
459}
460
461fn build_record_batch(
463 records: &[FlatRecord],
464 schema: &Arc<Schema>,
465) -> Result<RecordBatch, AppError> {
466 let len = records.len();
467
468 let mut original_id = StringBuilder::with_capacity(len, len * 32);
469 let mut source_portal = StringBuilder::with_capacity(len, len * 64);
470 let mut portal_name = StringBuilder::with_capacity(len, len * 24);
471 let mut url = StringBuilder::with_capacity(len, len * 128);
472 let mut title = StringBuilder::with_capacity(len, len * 64);
473 let mut description = StringBuilder::with_capacity(len, len * 256);
474 let mut tags = StringBuilder::with_capacity(len, len * 64);
475 let mut organization = StringBuilder::with_capacity(len, len * 48);
476 let mut license = StringBuilder::with_capacity(len, len * 32);
477 let mut metadata_created = StringBuilder::with_capacity(len, len * 24);
478 let mut metadata_modified = StringBuilder::with_capacity(len, len * 24);
479 let mut first_seen_at = StringBuilder::with_capacity(len, len * 32);
480 let mut language = StringBuilder::with_capacity(len, len * 8);
481 let mut is_duplicate = BooleanBuilder::with_capacity(len);
482
483 for r in records {
484 original_id.append_value(&r.original_id);
485 source_portal.append_value(&r.source_portal);
486 portal_name.append_value(&r.portal_name);
487 url.append_value(&r.url);
488 title.append_value(&r.title);
489 description.append_value(&r.description);
490 tags.append_value(&r.tags);
491 organization.append_value(&r.organization);
492 license.append_value(&r.license);
493 metadata_created.append_value(&r.metadata_created);
494 metadata_modified.append_value(&r.metadata_modified);
495 first_seen_at.append_value(&r.first_seen_at);
496 language.append_value(&r.language);
497 is_duplicate.append_value(r.is_duplicate);
498 }
499
500 RecordBatch::try_new(
501 schema.clone(),
502 vec![
503 Arc::new(original_id.finish()),
504 Arc::new(source_portal.finish()),
505 Arc::new(portal_name.finish()),
506 Arc::new(url.finish()),
507 Arc::new(title.finish()),
508 Arc::new(description.finish()),
509 Arc::new(tags.finish()),
510 Arc::new(organization.finish()),
511 Arc::new(license.finish()),
512 Arc::new(metadata_created.finish()),
513 Arc::new(metadata_modified.finish()),
514 Arc::new(first_seen_at.finish()),
515 Arc::new(language.finish()),
516 Arc::new(is_duplicate.finish()),
517 ],
518 )
519 .map_err(|e| AppError::Generic(format!("Failed to build RecordBatch: {}", e)))
520}
521
522fn get_or_create_portal_writer<'a>(
531 writers: &'a mut HashMap<String, ArrowWriter<fs::File>>,
532 portal_key: &str,
533 file_name: &str,
534 data_dir: &Path,
535 schema: &Arc<Schema>,
536 writer_props: &WriterProperties,
537) -> Result<&'a mut ArrowWriter<fs::File>, AppError> {
538 if !writers.contains_key(portal_key) {
539 let path = data_dir.join(format!("{}.parquet", file_name));
540 let file = fs::File::create(&path).map_err(|e| {
541 AppError::Generic(format!("Failed to create {}: {}", path.display(), e))
542 })?;
543 let writer = ArrowWriter::try_new(file, schema.clone(), Some(writer_props.clone()))
544 .map_err(|e| AppError::Generic(format!("Failed to create ArrowWriter: {}", e)))?;
545 writers.insert(portal_key.to_string(), writer);
546 }
547 Ok(writers
548 .get_mut(portal_key)
549 .expect("writer just inserted above"))
550}
551
552fn extract_tags(metadata: &serde_json::Value) -> String {
558 metadata
559 .get("tags")
560 .and_then(|t| t.as_array())
561 .map(|tags| {
562 tags.iter()
563 .filter_map(|t| {
564 t.get("name")
565 .or(t.get("display_name"))
566 .and_then(|n| n.as_str())
567 })
568 .collect::<Vec<_>>()
569 .join(", ")
570 })
571 .unwrap_or_default()
572}
573
574fn extract_organization(metadata: &serde_json::Value) -> String {
576 metadata
577 .get("organization")
578 .and_then(|org| {
579 org.get("title")
580 .or(org.get("name"))
581 .and_then(|n| n.as_str())
582 })
583 .unwrap_or_default()
584 .to_string()
585}
586
587fn extract_license(metadata: &serde_json::Value) -> String {
589 metadata
590 .get("license_title")
591 .or(metadata.get("license_id"))
592 .and_then(|v| v.as_str())
593 .unwrap_or_default()
594 .to_string()
595}
596
597fn extract_string(metadata: &serde_json::Value, key: &str) -> String {
599 metadata
600 .get(key)
601 .and_then(|v| v.as_str())
602 .unwrap_or_default()
603 .to_string()
604}
605
606fn normalize_portal_url(url: &str) -> String {
612 url.trim_end_matches('/').to_string()
613}
614
615fn portal_name_from_url(url: &str) -> String {
619 url.trim_start_matches("https://")
620 .trim_start_matches("http://")
621 .split('/')
622 .next()
623 .unwrap_or("unknown")
624 .replace('.', "-")
625}
626
627fn portal_file_name(name: &str) -> String {
629 name.to_lowercase()
630 .chars()
631 .map(|c| {
632 if c.is_ascii_alphanumeric() || c == '-' {
633 c
634 } else {
635 '-'
636 }
637 })
638 .collect()
639}
640
641#[cfg(test)]
642mod tests {
643 use super::*;
644
645 #[test]
646 fn test_extract_tags() {
647 let metadata = serde_json::json!({
648 "tags": [
649 {"name": "environment", "display_name": "Environment"},
650 {"name": "water", "display_name": "Water"}
651 ]
652 });
653 assert_eq!(extract_tags(&metadata), "environment, water");
654 }
655
656 #[test]
657 fn test_extract_tags_empty() {
658 let metadata = serde_json::json!({});
659 assert_eq!(extract_tags(&metadata), "");
660 }
661
662 #[test]
663 fn test_extract_organization() {
664 let metadata = serde_json::json!({
665 "organization": {"title": "City of Milan", "name": "milano"}
666 });
667 assert_eq!(extract_organization(&metadata), "City of Milan");
668 }
669
670 #[test]
671 fn test_extract_organization_fallback_to_name() {
672 let metadata = serde_json::json!({
673 "organization": {"name": "milano"}
674 });
675 assert_eq!(extract_organization(&metadata), "milano");
676 }
677
678 #[test]
679 fn test_extract_license() {
680 let metadata = serde_json::json!({"license_title": "CC-BY 4.0"});
681 assert_eq!(extract_license(&metadata), "CC-BY 4.0");
682 }
683
684 #[test]
685 fn test_extract_license_fallback() {
686 let metadata = serde_json::json!({"license_id": "cc-by"});
687 assert_eq!(extract_license(&metadata), "cc-by");
688 }
689
690 #[test]
691 fn test_portal_name_from_url() {
692 assert_eq!(
693 portal_name_from_url("https://dati.comune.milano.it"),
694 "dati-comune-milano-it"
695 );
696 assert_eq!(portal_name_from_url("https://data.gov.ie"), "data-gov-ie");
697 assert_eq!(
698 portal_name_from_url("https://dati.gov.it/opendata/"),
699 "dati-gov-it"
700 );
701 }
702
703 #[test]
704 fn test_portal_file_name() {
705 assert_eq!(portal_file_name("milano"), "milano");
706 assert_eq!(portal_file_name("dati-gov-it"), "dati-gov-it");
707 assert_eq!(portal_file_name("NRW Portal"), "nrw-portal");
708 }
709
710 #[test]
711 fn test_normalize_portal_url() {
712 assert_eq!(
713 normalize_portal_url("https://dati.gov.it/opendata/"),
714 "https://dati.gov.it/opendata"
715 );
716 assert_eq!(
717 normalize_portal_url("https://dati.comune.milano.it"),
718 "https://dati.comune.milano.it"
719 );
720 }
721
722 #[test]
723 fn test_build_record_batch() {
724 let schema = arrow_schema();
725 let records = vec![FlatRecord {
726 original_id: "test-1".to_string(),
727 source_portal: "https://example.com".to_string(),
728 portal_name: "example".to_string(),
729 url: "https://example.com/dataset/test-1".to_string(),
730 title: "Test Dataset".to_string(),
731 description: "A test dataset".to_string(),
732 tags: "tag1, tag2".to_string(),
733 organization: "Test Org".to_string(),
734 license: "CC-BY".to_string(),
735 metadata_created: "2025-01-01".to_string(),
736 metadata_modified: "2025-06-01".to_string(),
737 first_seen_at: "2025-01-01T00:00:00Z".to_string(),
738 language: "en".to_string(),
739 is_duplicate: false,
740 }];
741
742 let batch = build_record_batch(&records, &schema).unwrap();
743 assert_eq!(batch.num_rows(), 1);
744 assert_eq!(batch.num_columns(), 14);
745 }
746}