Skip to main content

ceres_core/
export.rs

1//! Export service for streaming dataset exports.
2//!
3//! This module provides the [`ExportService`] for memory-efficient export
4//! of datasets to various formats (JSONL, JSON, CSV).
5//!
6//! # Example
7//!
8//! ```ignore
9//! use ceres_core::export::{ExportService, ExportFormat};
10//! use std::io::stdout;
11//!
12//! let export_service = ExportService::new(store);
13//! let mut writer = stdout().lock();
14//! let count = export_service
15//!     .export_to_writer(&mut writer, ExportFormat::Jsonl, None, None)
16//!     .await?;
17//! println!("Exported {} datasets", count);
18//! ```
19//!
20//! Note: This example uses `ignore` because it requires a concrete
21//! [`DatasetStore`] implementation which cannot be easily provided in a doctest.
22
23use std::io::Write;
24
25use futures::StreamExt;
26use serde::Serialize;
27use tokio::io::{AsyncWrite, AsyncWriteExt};
28
29use crate::error::AppError;
30use crate::models::Dataset;
31use crate::traits::DatasetStore;
32
33/// Supported export formats.
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35pub enum ExportFormat {
36    /// JSON Lines format (one JSON object per line).
37    /// Most efficient for streaming and large datasets.
38    Jsonl,
39    /// Standard JSON array format.
40    /// Streams with manual bracket handling.
41    Json,
42    /// CSV format (comma-separated values).
43    Csv,
44}
45
46/// Service for exporting datasets in streaming mode.
47///
48/// Uses [`DatasetStore::list_stream`] to fetch datasets incrementally,
49/// avoiding loading all data into memory at once.
50pub struct ExportService<S>
51where
52    S: DatasetStore,
53{
54    store: S,
55}
56
57impl<S> Clone for ExportService<S>
58where
59    S: DatasetStore + Clone,
60{
61    fn clone(&self) -> Self {
62        Self {
63            store: self.store.clone(),
64        }
65    }
66}
67
68impl<S> ExportService<S>
69where
70    S: DatasetStore,
71{
72    /// Creates a new export service with the given store.
73    pub fn new(store: S) -> Self {
74        Self { store }
75    }
76
77    /// Exports datasets to a writer in streaming mode.
78    ///
79    /// # Arguments
80    ///
81    /// * `writer` - The output writer (e.g., stdout, file)
82    /// * `format` - The export format (JSONL, JSON, or CSV)
83    /// * `portal_filter` - Optional portal URL to filter by
84    /// * `limit` - Optional maximum number of records
85    ///
86    /// # Returns
87    ///
88    /// The number of datasets exported.
89    pub async fn export_to_writer<W: Write>(
90        &self,
91        writer: &mut W,
92        format: ExportFormat,
93        portal_filter: Option<&str>,
94        limit: Option<usize>,
95    ) -> Result<u64, AppError> {
96        let mut stream = self.store.list_stream(portal_filter, limit);
97        let mut count = 0u64;
98
99        match format {
100            ExportFormat::Jsonl => {
101                while let Some(result) = stream.next().await {
102                    let dataset = result?;
103                    let record = create_export_record(&dataset);
104                    let json = serde_json::to_string(&record)
105                        .map_err(|e| AppError::Generic(e.to_string()))?;
106                    writeln!(writer, "{}", json).map_err(|e| AppError::Generic(e.to_string()))?;
107                    count += 1;
108                }
109            }
110            ExportFormat::Json => {
111                writeln!(writer, "[").map_err(|e| AppError::Generic(e.to_string()))?;
112                let mut first = true;
113
114                while let Some(result) = stream.next().await {
115                    let dataset = result?;
116                    let record = create_export_record(&dataset);
117
118                    if !first {
119                        writeln!(writer, ",").map_err(|e| AppError::Generic(e.to_string()))?;
120                    }
121                    first = false;
122
123                    let json = serde_json::to_string_pretty(&record)
124                        .map_err(|e| AppError::Generic(e.to_string()))?;
125                    // Indent each line for proper formatting
126                    for line in json.lines() {
127                        writeln!(writer, "  {}", line)
128                            .map_err(|e| AppError::Generic(e.to_string()))?;
129                    }
130                    count += 1;
131                }
132
133                writeln!(writer, "]").map_err(|e| AppError::Generic(e.to_string()))?;
134            }
135            ExportFormat::Csv => {
136                writeln!(
137                    writer,
138                    "id,original_id,source_portal,url,title,description,first_seen_at,last_updated_at"
139                )
140                .map_err(|e| AppError::Generic(e.to_string()))?;
141
142                while let Some(result) = stream.next().await {
143                    let dataset = result?;
144                    write_csv_row(writer, &dataset)?;
145                    count += 1;
146                }
147            }
148        }
149
150        writer
151            .flush()
152            .map_err(|e| AppError::Generic(e.to_string()))?;
153        Ok(count)
154    }
155
156    /// Exports datasets to an async writer in streaming mode.
157    ///
158    /// This method writes directly to the async writer without buffering the entire
159    /// dataset in memory, making it suitable for HTTP streaming responses.
160    ///
161    /// # Arguments
162    ///
163    /// * `writer` - The async output writer
164    /// * `format` - The export format (JSONL, JSON, or CSV)
165    /// * `portal_filter` - Optional portal URL to filter by
166    /// * `limit` - Optional maximum number of records
167    ///
168    /// # Returns
169    ///
170    /// The number of datasets exported.
171    pub async fn export_to_async_writer<W: AsyncWrite + Unpin>(
172        &self,
173        writer: &mut W,
174        format: ExportFormat,
175        portal_filter: Option<&str>,
176        limit: Option<usize>,
177    ) -> Result<u64, AppError> {
178        let mut stream = self.store.list_stream(portal_filter, limit);
179        let mut count = 0u64;
180
181        match format {
182            ExportFormat::Jsonl => {
183                while let Some(result) = stream.next().await {
184                    let dataset = result?;
185                    let record = create_export_record(&dataset);
186                    let mut json = serde_json::to_string(&record)
187                        .map_err(|e| AppError::Generic(e.to_string()))?;
188                    json.push('\n');
189                    writer
190                        .write_all(json.as_bytes())
191                        .await
192                        .map_err(|e| AppError::Generic(e.to_string()))?;
193                    count += 1;
194                }
195            }
196            ExportFormat::Json => {
197                writer
198                    .write_all(b"[\n")
199                    .await
200                    .map_err(|e| AppError::Generic(e.to_string()))?;
201                let mut first = true;
202
203                while let Some(result) = stream.next().await {
204                    let dataset = result?;
205                    let record = create_export_record(&dataset);
206
207                    if !first {
208                        writer
209                            .write_all(b",\n")
210                            .await
211                            .map_err(|e| AppError::Generic(e.to_string()))?;
212                    }
213                    first = false;
214
215                    let json = serde_json::to_string_pretty(&record)
216                        .map_err(|e| AppError::Generic(e.to_string()))?;
217                    // Indent each line for proper formatting
218                    for line in json.lines() {
219                        writer
220                            .write_all(format!("  {}\n", line).as_bytes())
221                            .await
222                            .map_err(|e| AppError::Generic(e.to_string()))?;
223                    }
224                    count += 1;
225                }
226
227                writer
228                    .write_all(b"]\n")
229                    .await
230                    .map_err(|e| AppError::Generic(e.to_string()))?;
231            }
232            ExportFormat::Csv => {
233                writer
234                    .write_all(
235                        b"id,original_id,source_portal,url,title,description,first_seen_at,last_updated_at\n",
236                    )
237                    .await
238                    .map_err(|e| AppError::Generic(e.to_string()))?;
239
240                while let Some(result) = stream.next().await {
241                    let dataset = result?;
242                    let row = format_csv_row(&dataset);
243                    writer
244                        .write_all(row.as_bytes())
245                        .await
246                        .map_err(|e| AppError::Generic(e.to_string()))?;
247                    count += 1;
248                }
249            }
250        }
251
252        writer
253            .flush()
254            .await
255            .map_err(|e| AppError::Generic(e.to_string()))?;
256        Ok(count)
257    }
258}
259
260/// Record structure for JSON/JSONL export.
261#[derive(Serialize)]
262struct ExportRecord {
263    id: uuid::Uuid,
264    original_id: String,
265    source_portal: String,
266    url: String,
267    title: String,
268    description: Option<String>,
269    metadata: serde_json::Value,
270    first_seen_at: chrono::DateTime<chrono::Utc>,
271    last_updated_at: chrono::DateTime<chrono::Utc>,
272}
273
274fn create_export_record(dataset: &Dataset) -> ExportRecord {
275    ExportRecord {
276        id: dataset.id,
277        original_id: dataset.original_id.clone(),
278        source_portal: dataset.source_portal.clone(),
279        url: dataset.url.clone(),
280        title: dataset.title.clone(),
281        description: dataset.description.clone(),
282        metadata: dataset.metadata.0.clone(),
283        first_seen_at: dataset.first_seen_at,
284        last_updated_at: dataset.last_updated_at,
285    }
286}
287
288fn write_csv_row<W: Write>(writer: &mut W, dataset: &Dataset) -> Result<(), AppError> {
289    let row = format_csv_row(dataset);
290    writer
291        .write_all(row.as_bytes())
292        .map_err(|e| AppError::Generic(e.to_string()))?;
293    Ok(())
294}
295
296fn format_csv_row(dataset: &Dataset) -> String {
297    let description = dataset
298        .description
299        .as_ref()
300        .map(|d| escape_csv(d))
301        .unwrap_or_default();
302
303    format!(
304        "{},{},{},{},{},{},{},{}\n",
305        dataset.id,
306        escape_csv(&dataset.original_id),
307        escape_csv(&dataset.source_portal),
308        escape_csv(&dataset.url),
309        escape_csv(&dataset.title),
310        description,
311        dataset.first_seen_at.format("%Y-%m-%dT%H:%M:%SZ"),
312        dataset.last_updated_at.format("%Y-%m-%dT%H:%M:%SZ"),
313    )
314}
315
316fn escape_csv(s: &str) -> String {
317    if s.contains(',') || s.contains('"') || s.contains('\n') || s.contains('\r') {
318        format!("\"{}\"", s.replace('"', "\"\""))
319    } else {
320        s.to_string()
321    }
322}
323
324#[cfg(test)]
325mod tests {
326    use super::*;
327
328    #[test]
329    fn test_escape_csv_no_special_chars() {
330        assert_eq!(escape_csv("hello"), "hello");
331    }
332
333    #[test]
334    fn test_escape_csv_with_comma() {
335        assert_eq!(escape_csv("hello, world"), "\"hello, world\"");
336    }
337
338    #[test]
339    fn test_escape_csv_with_quote() {
340        assert_eq!(escape_csv("hello \"world\""), "\"hello \"\"world\"\"\"");
341    }
342
343    #[test]
344    fn test_escape_csv_with_newline() {
345        assert_eq!(escape_csv("hello\nworld"), "\"hello\nworld\"");
346    }
347}