use std::io::Write;
use futures::StreamExt;
use serde::Serialize;
use tokio::io::{AsyncWrite, AsyncWriteExt};
use crate::error::AppError;
use crate::models::Dataset;
use crate::traits::DatasetStore;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ExportFormat {
Jsonl,
Json,
Csv,
}
pub struct ExportService<S>
where
S: DatasetStore,
{
store: S,
}
impl<S> Clone for ExportService<S>
where
S: DatasetStore + Clone,
{
fn clone(&self) -> Self {
Self {
store: self.store.clone(),
}
}
}
impl<S> ExportService<S>
where
S: DatasetStore,
{
pub fn new(store: S) -> Self {
Self { store }
}
pub async fn export_to_writer<W: Write>(
&self,
writer: &mut W,
format: ExportFormat,
portal_filter: Option<&str>,
limit: Option<usize>,
) -> Result<u64, AppError> {
let mut stream = self.store.list_stream(portal_filter, limit);
let mut count = 0u64;
match format {
ExportFormat::Jsonl => {
while let Some(result) = stream.next().await {
let dataset = result?;
let record = create_export_record(&dataset);
let json = serde_json::to_string(&record)
.map_err(|e| AppError::ExportError(e.to_string()))?;
writeln!(writer, "{}", json).map_err(|e| AppError::IoError(e.to_string()))?;
count += 1;
}
}
ExportFormat::Json => {
writeln!(writer, "[").map_err(|e| AppError::IoError(e.to_string()))?;
let mut first = true;
while let Some(result) = stream.next().await {
let dataset = result?;
let record = create_export_record(&dataset);
if !first {
writeln!(writer, ",").map_err(|e| AppError::IoError(e.to_string()))?;
}
first = false;
let json = serde_json::to_string_pretty(&record)
.map_err(|e| AppError::ExportError(e.to_string()))?;
for line in json.lines() {
writeln!(writer, " {}", line)
.map_err(|e| AppError::IoError(e.to_string()))?;
}
count += 1;
}
writeln!(writer, "]").map_err(|e| AppError::IoError(e.to_string()))?;
}
ExportFormat::Csv => {
writeln!(
writer,
"id,original_id,source_portal,url,title,description,first_seen_at,last_updated_at"
)
.map_err(|e| AppError::IoError(e.to_string()))?;
while let Some(result) = stream.next().await {
let dataset = result?;
write_csv_row(writer, &dataset)?;
count += 1;
}
}
}
writer
.flush()
.map_err(|e| AppError::IoError(e.to_string()))?;
Ok(count)
}
pub async fn export_to_async_writer<W: AsyncWrite + Unpin>(
&self,
writer: &mut W,
format: ExportFormat,
portal_filter: Option<&str>,
limit: Option<usize>,
) -> Result<u64, AppError> {
let mut stream = self.store.list_stream(portal_filter, limit);
let mut count = 0u64;
match format {
ExportFormat::Jsonl => {
while let Some(result) = stream.next().await {
let dataset = result?;
let record = create_export_record(&dataset);
let mut json = serde_json::to_string(&record)
.map_err(|e| AppError::ExportError(e.to_string()))?;
json.push('\n');
writer
.write_all(json.as_bytes())
.await
.map_err(|e| AppError::IoError(e.to_string()))?;
count += 1;
}
}
ExportFormat::Json => {
writer
.write_all(b"[\n")
.await
.map_err(|e| AppError::IoError(e.to_string()))?;
let mut first = true;
while let Some(result) = stream.next().await {
let dataset = result?;
let record = create_export_record(&dataset);
if !first {
writer
.write_all(b",\n")
.await
.map_err(|e| AppError::IoError(e.to_string()))?;
}
first = false;
let json = serde_json::to_string_pretty(&record)
.map_err(|e| AppError::ExportError(e.to_string()))?;
for line in json.lines() {
writer
.write_all(format!(" {}\n", line).as_bytes())
.await
.map_err(|e| AppError::IoError(e.to_string()))?;
}
count += 1;
}
writer
.write_all(b"]\n")
.await
.map_err(|e| AppError::IoError(e.to_string()))?;
}
ExportFormat::Csv => {
writer
.write_all(
b"id,original_id,source_portal,url,title,description,first_seen_at,last_updated_at\n",
)
.await
.map_err(|e| AppError::IoError(e.to_string()))?;
while let Some(result) = stream.next().await {
let dataset = result?;
let row = format_csv_row(&dataset);
writer
.write_all(row.as_bytes())
.await
.map_err(|e| AppError::IoError(e.to_string()))?;
count += 1;
}
}
}
writer
.flush()
.await
.map_err(|e| AppError::IoError(e.to_string()))?;
Ok(count)
}
}
#[derive(Serialize)]
struct ExportRecord {
id: uuid::Uuid,
original_id: String,
source_portal: String,
url: String,
title: String,
description: Option<String>,
metadata: serde_json::Value,
first_seen_at: chrono::DateTime<chrono::Utc>,
last_updated_at: chrono::DateTime<chrono::Utc>,
}
fn create_export_record(dataset: &Dataset) -> ExportRecord {
ExportRecord {
id: dataset.id,
original_id: dataset.original_id.clone(),
source_portal: dataset.source_portal.clone(),
url: dataset.url.clone(),
title: dataset.title.clone(),
description: dataset.description.clone(),
metadata: dataset.metadata.clone(),
first_seen_at: dataset.first_seen_at,
last_updated_at: dataset.last_updated_at,
}
}
fn write_csv_row<W: Write>(writer: &mut W, dataset: &Dataset) -> Result<(), AppError> {
let row = format_csv_row(dataset);
writer
.write_all(row.as_bytes())
.map_err(|e| AppError::IoError(e.to_string()))?;
Ok(())
}
fn format_csv_row(dataset: &Dataset) -> String {
let description = dataset
.description
.as_ref()
.map(|d| escape_csv(d))
.unwrap_or_default();
format!(
"{},{},{},{},{},{},{},{}\n",
dataset.id,
escape_csv(&dataset.original_id),
escape_csv(&dataset.source_portal),
escape_csv(&dataset.url),
escape_csv(&dataset.title),
description,
dataset.first_seen_at.format("%Y-%m-%dT%H:%M:%SZ"),
dataset.last_updated_at.format("%Y-%m-%dT%H:%M:%SZ"),
)
}
fn escape_csv(s: &str) -> String {
if s.contains(',') || s.contains('"') || s.contains('\n') || s.contains('\r') {
format!("\"{}\"", s.replace('"', "\"\""))
} else {
s.to_string()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_escape_csv_no_special_chars() {
assert_eq!(escape_csv("hello"), "hello");
}
#[test]
fn test_escape_csv_with_comma() {
assert_eq!(escape_csv("hello, world"), "\"hello, world\"");
}
#[test]
fn test_escape_csv_with_quote() {
assert_eq!(escape_csv("hello \"world\""), "\"hello \"\"world\"\"\"");
}
#[test]
fn test_escape_csv_with_newline() {
assert_eq!(escape_csv("hello\nworld"), "\"hello\nworld\"");
}
}