Skip to main content

ceres_core/
parquet_export.rs

1//! Parquet export service for publishing a curated open data index.
2//!
3//! Produces flattened, curated Parquet files suitable for HuggingFace
4//! from the Ceres dataset index. Includes noise filtering, metadata
5//! flattening, portal name resolution, and cross-portal duplicate detection.
6
7use std::collections::{HashMap, HashSet};
8use std::fs;
9use std::path::{Path, PathBuf};
10use std::sync::Arc;
11
12use arrow::array::{BooleanBuilder, StringBuilder};
13use arrow::datatypes::{DataType, Field, Schema};
14use arrow::record_batch::RecordBatch;
15use chrono::Utc;
16use futures::StreamExt;
17use parquet::arrow::ArrowWriter;
18use parquet::basic::{Compression, ZstdLevel};
19use parquet::file::properties::WriterProperties;
20use serde::Serialize;
21use tracing::info;
22
23use crate::config::PortalsConfig;
24use crate::error::AppError;
25use crate::models::Dataset;
26use crate::traits::DatasetStore;
27
28/// Configuration for Parquet export curation.
29pub struct ParquetExportConfig {
30    /// Minimum title length — titles shorter are filtered as noise.
31    pub min_title_length: usize,
32    /// Noise title patterns to filter (case-insensitive substring match).
33    pub noise_patterns: Vec<String>,
34    /// Number of rows per Arrow RecordBatch / row group.
35    pub batch_size: usize,
36}
37
38impl Default for ParquetExportConfig {
39    fn default() -> Self {
40        Self {
41            min_title_length: 5,
42            noise_patterns: vec!["test".into(), "prova".into(), "esempio".into()],
43            batch_size: 10_000,
44        }
45    }
46}
47
48/// Result of a Parquet export operation.
49#[derive(Debug, Serialize)]
50pub struct ParquetExportResult {
51    pub total_exported: u64,
52    pub total_filtered: u64,
53    pub total_duplicates: u64,
54    pub portals: Vec<PortalExportStats>,
55    pub snapshot_date: String,
56    pub output_dir: PathBuf,
57}
58
59/// Per-portal export statistics.
60#[derive(Debug, Serialize)]
61pub struct PortalExportStats {
62    pub name: String,
63    pub url: String,
64    pub count: u64,
65}
66
67/// Intermediate flattened record between Database `Dataset` and Arrow `RecordBatch`.
68struct FlatRecord {
69    original_id: String,
70    source_portal: String,
71    portal_name: String,
72    url: String,
73    title: String,
74    description: String,
75    tags: String,
76    organization: String,
77    license: String,
78    metadata_created: String,
79    metadata_modified: String,
80    first_seen_at: String,
81    language: String,
82    is_duplicate: bool,
83}
84
85/// Service for exporting curated datasets as Parquet.
86pub struct ParquetExportService<S: DatasetStore> {
87    store: S,
88    config: ParquetExportConfig,
89    portal_names: HashMap<String, String>,
90    portal_languages: HashMap<String, String>,
91}
92
93impl<S: DatasetStore> ParquetExportService<S> {
94    /// Creates a new Parquet export service.
95    ///
96    /// Portal names and languages are resolved from `portals_config` when provided.
97    /// Portals not in the config get names derived from their URL hostname.
98    pub fn new(
99        store: S,
100        portals_config: Option<PortalsConfig>,
101        config: ParquetExportConfig,
102    ) -> Self {
103        let mut portal_names = HashMap::new();
104        let mut portal_languages = HashMap::new();
105
106        if let Some(pc) = &portals_config {
107            for entry in &pc.portals {
108                let url = normalize_portal_url(&entry.url);
109                portal_names.insert(url.clone(), entry.name.clone());
110                portal_languages.insert(url, entry.language().to_string());
111            }
112        }
113
114        Self {
115            store,
116            config,
117            portal_names,
118            portal_languages,
119        }
120    }
121
122    /// Exports curated datasets as Parquet files to the given directory.
123    ///
124    /// Creates:
125    /// - `all.parquet` — complete curated dataset
126    /// - `data/<portal-name>.parquet` — per-portal subsets
127    /// - `metadata.json` — snapshot metadata with counts
128    pub async fn export_to_directory(
129        &self,
130        output_dir: &Path,
131    ) -> Result<ParquetExportResult, AppError> {
132        // Create directory structure
133        let data_dir = output_dir.join("data");
134        fs::create_dir_all(&data_dir).map_err(|e| {
135            AppError::Generic(format!(
136                "Failed to create output directory {}: {}",
137                data_dir.display(),
138                e
139            ))
140        })?;
141
142        info!("Loading cross-portal duplicate titles...");
143        let duplicate_titles = self.store.get_duplicate_titles().await?;
144        info!(
145            "Found {} duplicate title groups across portals",
146            duplicate_titles.len()
147        );
148
149        info!("Streaming datasets for Parquet export...");
150        let (exported, filtered, duplicates, portal_stats) =
151            self.stream_and_write(output_dir, &duplicate_titles).await?;
152
153        let snapshot_date = Utc::now().format("%Y-%m-%d").to_string();
154
155        let result = ParquetExportResult {
156            total_exported: exported,
157            total_filtered: filtered,
158            total_duplicates: duplicates,
159            portals: portal_stats,
160            snapshot_date: snapshot_date.clone(),
161            output_dir: output_dir.to_path_buf(),
162        };
163
164        // Write metadata.json
165        let metadata_path = output_dir.join("metadata.json");
166        let metadata_json = serde_json::to_string_pretty(&result)
167            .map_err(|e| AppError::Generic(format!("Failed to serialize metadata: {}", e)))?;
168        fs::write(&metadata_path, metadata_json).map_err(|e| {
169            AppError::Generic(format!(
170                "Failed to write {}: {}",
171                metadata_path.display(),
172                e
173            ))
174        })?;
175
176        Ok(result)
177    }
178
179    /// Streams all datasets, applies curation, and writes Parquet files.
180    ///
181    /// Returns (exported_count, filtered_count, duplicate_count, per_portal_stats).
182    async fn stream_and_write(
183        &self,
184        output_dir: &Path,
185        duplicate_titles: &HashSet<String>,
186    ) -> Result<(u64, u64, u64, Vec<PortalExportStats>), AppError> {
187        let schema = arrow_schema();
188        let writer_props = writer_properties();
189
190        let data_dir = output_dir.join("data");
191
192        // Open the "all" writer
193        let all_path = output_dir.join("all.parquet");
194        let all_file = fs::File::create(&all_path).map_err(|e| {
195            AppError::Generic(format!("Failed to create {}: {}", all_path.display(), e))
196        })?;
197        let mut all_writer =
198            ArrowWriter::try_new(all_file, schema.clone(), Some(writer_props.clone()))
199                .map_err(|e| AppError::Generic(format!("Failed to create ArrowWriter: {}", e)))?;
200
201        // Per-portal state keyed by normalized source_portal URL (stable, unique)
202        let mut portal_writers: HashMap<String, ArrowWriter<fs::File>> = HashMap::new();
203        let mut portal_buffers: HashMap<String, Vec<FlatRecord>> = HashMap::new();
204        let mut portal_counts: HashMap<String, u64> = HashMap::new();
205        // Track portal_key -> (display_name, file_name) for stats and file creation
206        let mut portal_info: HashMap<String, (String, String)> = HashMap::new();
207
208        // Main "all" buffer
209        let mut all_buffer: Vec<FlatRecord> = Vec::with_capacity(self.config.batch_size);
210
211        let mut total_exported = 0u64;
212        let mut total_filtered = 0u64;
213        let mut total_duplicates = 0u64;
214
215        let mut stream = self.store.list_stream(None, None);
216
217        while let Some(result) = stream.next().await {
218            let dataset = result?;
219
220            // Apply noise filter
221            if self.is_noise(&dataset) {
222                total_filtered += 1;
223                continue;
224            }
225
226            let is_duplicate = duplicate_titles.contains(&dataset.title.to_lowercase());
227            if is_duplicate {
228                total_duplicates += 1;
229            }
230
231            let record = self.flatten_dataset(&dataset, is_duplicate);
232
233            // Use normalized source_portal URL as the stable partition key
234            let portal_key = normalize_portal_url(&record.source_portal);
235            let file_name = portal_file_name(&record.portal_name);
236            portal_info
237                .entry(portal_key.clone())
238                .or_insert_with(|| (record.portal_name.clone(), file_name));
239
240            // Add to portal buffer
241            portal_buffers
242                .entry(portal_key.clone())
243                .or_default()
244                .push(FlatRecord {
245                    original_id: record.original_id.clone(),
246                    source_portal: record.source_portal.clone(),
247                    portal_name: record.portal_name.clone(),
248                    url: record.url.clone(),
249                    title: record.title.clone(),
250                    description: record.description.clone(),
251                    tags: record.tags.clone(),
252                    organization: record.organization.clone(),
253                    license: record.license.clone(),
254                    metadata_created: record.metadata_created.clone(),
255                    metadata_modified: record.metadata_modified.clone(),
256                    first_seen_at: record.first_seen_at.clone(),
257                    language: record.language.clone(),
258                    is_duplicate: record.is_duplicate,
259                });
260
261            // Add to all buffer
262            all_buffer.push(record);
263            total_exported += 1;
264            *portal_counts.entry(portal_key.clone()).or_default() += 1;
265
266            // Flush "all" buffer when full
267            if all_buffer.len() >= self.config.batch_size {
268                let batch = build_record_batch(&all_buffer, &schema)?;
269                all_writer
270                    .write(&batch)
271                    .map_err(|e| AppError::Generic(format!("Parquet write error: {}", e)))?;
272                all_buffer.clear();
273            }
274
275            // Flush portal buffer when full
276            if let Some(buf) = portal_buffers.get(&portal_key)
277                && buf.len() >= self.config.batch_size
278            {
279                let buf = portal_buffers
280                    .remove(&portal_key)
281                    .expect("buffer must exist: checked by get() above");
282                let batch = build_record_batch(&buf, &schema)?;
283                let (_, ref fname) = portal_info[&portal_key];
284                let writer = get_or_create_portal_writer(
285                    &mut portal_writers,
286                    &portal_key,
287                    fname,
288                    &data_dir,
289                    &schema,
290                    &writer_props,
291                )?;
292                writer
293                    .write(&batch)
294                    .map_err(|e| AppError::Generic(format!("Parquet write error: {}", e)))?;
295            }
296        }
297
298        // Flush remaining "all" buffer
299        if !all_buffer.is_empty() {
300            let batch = build_record_batch(&all_buffer, &schema)?;
301            all_writer
302                .write(&batch)
303                .map_err(|e| AppError::Generic(format!("Parquet write error: {}", e)))?;
304        }
305
306        // Flush remaining portal buffers
307        for (portal_key, buf) in portal_buffers.drain() {
308            if !buf.is_empty() {
309                let batch = build_record_batch(&buf, &schema)?;
310                let (_, ref fname) = portal_info[&portal_key];
311                let writer = get_or_create_portal_writer(
312                    &mut portal_writers,
313                    &portal_key,
314                    fname,
315                    &data_dir,
316                    &schema,
317                    &writer_props,
318                )?;
319                writer
320                    .write(&batch)
321                    .map_err(|e| AppError::Generic(format!("Parquet write error: {}", e)))?;
322            }
323        }
324
325        // Close all writers
326        all_writer
327            .close()
328            .map_err(|e| AppError::Generic(format!("Failed to close all.parquet: {}", e)))?;
329
330        for (portal_key, writer) in portal_writers {
331            let (_, ref fname) = portal_info[&portal_key];
332            writer.close().map_err(|e| {
333                AppError::Generic(format!("Failed to close {}.parquet: {}", fname, e))
334            })?;
335        }
336
337        // Build per-portal stats with accurate names and URLs
338        let mut portal_stats: Vec<PortalExportStats> = portal_counts
339            .into_iter()
340            .map(|(portal_key, count)| {
341                let (ref name, _) = portal_info[&portal_key];
342                PortalExportStats {
343                    name: name.clone(),
344                    url: portal_key,
345                    count,
346                }
347            })
348            .collect();
349        portal_stats.sort_by(|a, b| b.count.cmp(&a.count));
350
351        Ok((
352            total_exported,
353            total_filtered,
354            total_duplicates,
355            portal_stats,
356        ))
357    }
358
359    /// Returns true if the dataset should be filtered out as noise.
360    fn is_noise(&self, dataset: &Dataset) -> bool {
361        // Filter tiny titles
362        if dataset.title.len() < self.config.min_title_length {
363            return true;
364        }
365
366        // Filter empty descriptions
367        if dataset
368            .description
369            .as_ref()
370            .is_none_or(|d| d.trim().is_empty())
371        {
372            return true;
373        }
374
375        // Filter noise patterns in title
376        let title_lower = dataset.title.to_lowercase();
377        for pattern in &self.config.noise_patterns {
378            if title_lower.contains(pattern.as_str()) {
379                return true;
380            }
381        }
382
383        false
384    }
385
386    /// Flattens a Dataset into an export record with extracted metadata.
387    fn flatten_dataset(&self, dataset: &Dataset, is_duplicate: bool) -> FlatRecord {
388        let metadata = &dataset.metadata;
389        let normalized_url = normalize_portal_url(&dataset.source_portal);
390
391        let portal_name = self
392            .portal_names
393            .get(&normalized_url)
394            .cloned()
395            .unwrap_or_else(|| portal_name_from_url(&dataset.source_portal));
396
397        let language = self
398            .portal_languages
399            .get(&normalized_url)
400            .cloned()
401            .or_else(|| {
402                metadata
403                    .get("language")
404                    .and_then(|v| v.as_str())
405                    .map(|s| s.to_lowercase())
406            })
407            .unwrap_or_else(|| "unknown".to_string());
408
409        FlatRecord {
410            original_id: dataset.original_id.clone(),
411            source_portal: dataset.source_portal.clone(),
412            portal_name,
413            url: dataset.url.clone(),
414            title: dataset.title.clone(),
415            description: dataset.description.clone().unwrap_or_default(),
416            tags: extract_tags(metadata),
417            organization: extract_organization(metadata),
418            license: extract_license(metadata),
419            metadata_created: extract_string(metadata, "metadata_created"),
420            metadata_modified: extract_string(metadata, "metadata_modified"),
421            first_seen_at: dataset.first_seen_at.to_rfc3339(),
422            language,
423            is_duplicate,
424        }
425    }
426}
427
428// =============================================================================
429// Arrow Schema & RecordBatch Construction
430// =============================================================================
431
432/// Returns the Arrow schema for the Parquet export.
433fn arrow_schema() -> Arc<Schema> {
434    Arc::new(Schema::new(vec![
435        Field::new("original_id", DataType::Utf8, false),
436        Field::new("source_portal", DataType::Utf8, false),
437        Field::new("portal_name", DataType::Utf8, false),
438        Field::new("url", DataType::Utf8, false),
439        Field::new("title", DataType::Utf8, false),
440        Field::new("description", DataType::Utf8, true),
441        Field::new("tags", DataType::Utf8, true),
442        Field::new("organization", DataType::Utf8, true),
443        Field::new("license", DataType::Utf8, true),
444        Field::new("metadata_created", DataType::Utf8, true),
445        Field::new("metadata_modified", DataType::Utf8, true),
446        Field::new("first_seen_at", DataType::Utf8, false),
447        Field::new("language", DataType::Utf8, true),
448        Field::new("is_duplicate", DataType::Boolean, false),
449    ]))
450}
451
452/// Returns Parquet writer properties with Zstd compression.
453fn writer_properties() -> WriterProperties {
454    WriterProperties::builder()
455        .set_compression(Compression::ZSTD(
456            ZstdLevel::try_new(3).expect("zstd level 3 should be valid"),
457        ))
458        .build()
459}
460
461/// Builds an Arrow RecordBatch from a slice of FlatRecords.
462fn build_record_batch(
463    records: &[FlatRecord],
464    schema: &Arc<Schema>,
465) -> Result<RecordBatch, AppError> {
466    let len = records.len();
467
468    let mut original_id = StringBuilder::with_capacity(len, len * 32);
469    let mut source_portal = StringBuilder::with_capacity(len, len * 64);
470    let mut portal_name = StringBuilder::with_capacity(len, len * 24);
471    let mut url = StringBuilder::with_capacity(len, len * 128);
472    let mut title = StringBuilder::with_capacity(len, len * 64);
473    let mut description = StringBuilder::with_capacity(len, len * 256);
474    let mut tags = StringBuilder::with_capacity(len, len * 64);
475    let mut organization = StringBuilder::with_capacity(len, len * 48);
476    let mut license = StringBuilder::with_capacity(len, len * 32);
477    let mut metadata_created = StringBuilder::with_capacity(len, len * 24);
478    let mut metadata_modified = StringBuilder::with_capacity(len, len * 24);
479    let mut first_seen_at = StringBuilder::with_capacity(len, len * 32);
480    let mut language = StringBuilder::with_capacity(len, len * 8);
481    let mut is_duplicate = BooleanBuilder::with_capacity(len);
482
483    for r in records {
484        original_id.append_value(&r.original_id);
485        source_portal.append_value(&r.source_portal);
486        portal_name.append_value(&r.portal_name);
487        url.append_value(&r.url);
488        title.append_value(&r.title);
489        description.append_value(&r.description);
490        tags.append_value(&r.tags);
491        organization.append_value(&r.organization);
492        license.append_value(&r.license);
493        metadata_created.append_value(&r.metadata_created);
494        metadata_modified.append_value(&r.metadata_modified);
495        first_seen_at.append_value(&r.first_seen_at);
496        language.append_value(&r.language);
497        is_duplicate.append_value(r.is_duplicate);
498    }
499
500    RecordBatch::try_new(
501        schema.clone(),
502        vec![
503            Arc::new(original_id.finish()),
504            Arc::new(source_portal.finish()),
505            Arc::new(portal_name.finish()),
506            Arc::new(url.finish()),
507            Arc::new(title.finish()),
508            Arc::new(description.finish()),
509            Arc::new(tags.finish()),
510            Arc::new(organization.finish()),
511            Arc::new(license.finish()),
512            Arc::new(metadata_created.finish()),
513            Arc::new(metadata_modified.finish()),
514            Arc::new(first_seen_at.finish()),
515            Arc::new(language.finish()),
516            Arc::new(is_duplicate.finish()),
517        ],
518    )
519    .map_err(|e| AppError::Generic(format!("Failed to build RecordBatch: {}", e)))
520}
521
522// =============================================================================
523// Per-Portal Writer Management
524// =============================================================================
525
526/// Gets or creates an ArrowWriter for a specific portal.
527///
528/// `portal_key` is the stable map key (normalized URL), `file_name` is the
529/// human-readable name used for the `.parquet` file on disk.
530fn get_or_create_portal_writer<'a>(
531    writers: &'a mut HashMap<String, ArrowWriter<fs::File>>,
532    portal_key: &str,
533    file_name: &str,
534    data_dir: &Path,
535    schema: &Arc<Schema>,
536    writer_props: &WriterProperties,
537) -> Result<&'a mut ArrowWriter<fs::File>, AppError> {
538    if !writers.contains_key(portal_key) {
539        let path = data_dir.join(format!("{}.parquet", file_name));
540        let file = fs::File::create(&path).map_err(|e| {
541            AppError::Generic(format!("Failed to create {}: {}", path.display(), e))
542        })?;
543        let writer = ArrowWriter::try_new(file, schema.clone(), Some(writer_props.clone()))
544            .map_err(|e| AppError::Generic(format!("Failed to create ArrowWriter: {}", e)))?;
545        writers.insert(portal_key.to_string(), writer);
546    }
547    Ok(writers
548        .get_mut(portal_key)
549        .expect("writer just inserted above"))
550}
551
552// =============================================================================
553// Metadata Extraction Helpers
554// =============================================================================
555
556/// Extracts tag names from CKAN metadata as a comma-separated string.
557fn extract_tags(metadata: &serde_json::Value) -> String {
558    metadata
559        .get("tags")
560        .and_then(|t| t.as_array())
561        .map(|tags| {
562            tags.iter()
563                .filter_map(|t| {
564                    t.get("name")
565                        .or(t.get("display_name"))
566                        .and_then(|n| n.as_str())
567                })
568                .collect::<Vec<_>>()
569                .join(", ")
570        })
571        .unwrap_or_default()
572}
573
574/// Extracts organization name from CKAN metadata.
575fn extract_organization(metadata: &serde_json::Value) -> String {
576    metadata
577        .get("organization")
578        .and_then(|org| {
579            org.get("title")
580                .or(org.get("name"))
581                .and_then(|n| n.as_str())
582        })
583        .unwrap_or_default()
584        .to_string()
585}
586
587/// Extracts license from CKAN metadata.
588fn extract_license(metadata: &serde_json::Value) -> String {
589    metadata
590        .get("license_title")
591        .or(metadata.get("license_id"))
592        .and_then(|v| v.as_str())
593        .unwrap_or_default()
594        .to_string()
595}
596
597/// Extracts a string field from metadata by key.
598fn extract_string(metadata: &serde_json::Value, key: &str) -> String {
599    metadata
600        .get(key)
601        .and_then(|v| v.as_str())
602        .unwrap_or_default()
603        .to_string()
604}
605
606// =============================================================================
607// URL / Name Utilities
608// =============================================================================
609
610/// Normalizes a portal URL by trimming trailing slashes for consistent map lookup.
611fn normalize_portal_url(url: &str) -> String {
612    url.trim_end_matches('/').to_string()
613}
614
615/// Derives a human-readable portal name from its URL hostname.
616///
617/// e.g. `https://dati.comune.milano.it` -> `dati-comune-milano-it`
618fn portal_name_from_url(url: &str) -> String {
619    url.trim_start_matches("https://")
620        .trim_start_matches("http://")
621        .split('/')
622        .next()
623        .unwrap_or("unknown")
624        .replace('.', "-")
625}
626
627/// Converts a portal name to a safe file name.
628fn portal_file_name(name: &str) -> String {
629    name.to_lowercase()
630        .chars()
631        .map(|c| {
632            if c.is_ascii_alphanumeric() || c == '-' {
633                c
634            } else {
635                '-'
636            }
637        })
638        .collect()
639}
640
641#[cfg(test)]
642mod tests {
643    use super::*;
644
645    #[test]
646    fn test_extract_tags() {
647        let metadata = serde_json::json!({
648            "tags": [
649                {"name": "environment", "display_name": "Environment"},
650                {"name": "water", "display_name": "Water"}
651            ]
652        });
653        assert_eq!(extract_tags(&metadata), "environment, water");
654    }
655
656    #[test]
657    fn test_extract_tags_empty() {
658        let metadata = serde_json::json!({});
659        assert_eq!(extract_tags(&metadata), "");
660    }
661
662    #[test]
663    fn test_extract_organization() {
664        let metadata = serde_json::json!({
665            "organization": {"title": "City of Milan", "name": "milano"}
666        });
667        assert_eq!(extract_organization(&metadata), "City of Milan");
668    }
669
670    #[test]
671    fn test_extract_organization_fallback_to_name() {
672        let metadata = serde_json::json!({
673            "organization": {"name": "milano"}
674        });
675        assert_eq!(extract_organization(&metadata), "milano");
676    }
677
678    #[test]
679    fn test_extract_license() {
680        let metadata = serde_json::json!({"license_title": "CC-BY 4.0"});
681        assert_eq!(extract_license(&metadata), "CC-BY 4.0");
682    }
683
684    #[test]
685    fn test_extract_license_fallback() {
686        let metadata = serde_json::json!({"license_id": "cc-by"});
687        assert_eq!(extract_license(&metadata), "cc-by");
688    }
689
690    #[test]
691    fn test_portal_name_from_url() {
692        assert_eq!(
693            portal_name_from_url("https://dati.comune.milano.it"),
694            "dati-comune-milano-it"
695        );
696        assert_eq!(portal_name_from_url("https://data.gov.ie"), "data-gov-ie");
697        assert_eq!(
698            portal_name_from_url("https://dati.gov.it/opendata/"),
699            "dati-gov-it"
700        );
701    }
702
703    #[test]
704    fn test_portal_file_name() {
705        assert_eq!(portal_file_name("milano"), "milano");
706        assert_eq!(portal_file_name("dati-gov-it"), "dati-gov-it");
707        assert_eq!(portal_file_name("NRW Portal"), "nrw-portal");
708    }
709
710    #[test]
711    fn test_normalize_portal_url() {
712        assert_eq!(
713            normalize_portal_url("https://dati.gov.it/opendata/"),
714            "https://dati.gov.it/opendata"
715        );
716        assert_eq!(
717            normalize_portal_url("https://dati.comune.milano.it"),
718            "https://dati.comune.milano.it"
719        );
720    }
721
722    #[test]
723    fn test_build_record_batch() {
724        let schema = arrow_schema();
725        let records = vec![FlatRecord {
726            original_id: "test-1".to_string(),
727            source_portal: "https://example.com".to_string(),
728            portal_name: "example".to_string(),
729            url: "https://example.com/dataset/test-1".to_string(),
730            title: "Test Dataset".to_string(),
731            description: "A test dataset".to_string(),
732            tags: "tag1, tag2".to_string(),
733            organization: "Test Org".to_string(),
734            license: "CC-BY".to_string(),
735            metadata_created: "2025-01-01".to_string(),
736            metadata_modified: "2025-06-01".to_string(),
737            first_seen_at: "2025-01-01T00:00:00Z".to_string(),
738            language: "en".to_string(),
739            is_duplicate: false,
740        }];
741
742        let batch = build_record_batch(&records, &schema).unwrap();
743        assert_eq!(batch.num_rows(), 1);
744        assert_eq!(batch.num_columns(), 14);
745    }
746}