1use std::io::Write;
24
25use futures::StreamExt;
26use serde::Serialize;
27use tokio::io::{AsyncWrite, AsyncWriteExt};
28
29use crate::error::AppError;
30use crate::models::Dataset;
31use crate::traits::DatasetStore;
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35pub enum ExportFormat {
36 Jsonl,
39 Json,
42 Csv,
44}
45
46pub struct ExportService<S>
51where
52 S: DatasetStore,
53{
54 store: S,
55}
56
57impl<S> Clone for ExportService<S>
58where
59 S: DatasetStore + Clone,
60{
61 fn clone(&self) -> Self {
62 Self {
63 store: self.store.clone(),
64 }
65 }
66}
67
68impl<S> ExportService<S>
69where
70 S: DatasetStore,
71{
72 pub fn new(store: S) -> Self {
74 Self { store }
75 }
76
77 pub async fn export_to_writer<W: Write>(
90 &self,
91 writer: &mut W,
92 format: ExportFormat,
93 portal_filter: Option<&str>,
94 limit: Option<usize>,
95 ) -> Result<u64, AppError> {
96 let mut stream = self.store.list_stream(portal_filter, limit);
97 let mut count = 0u64;
98
99 match format {
100 ExportFormat::Jsonl => {
101 while let Some(result) = stream.next().await {
102 let dataset = result?;
103 let record = create_export_record(&dataset);
104 let json = serde_json::to_string(&record)
105 .map_err(|e| AppError::Generic(e.to_string()))?;
106 writeln!(writer, "{}", json).map_err(|e| AppError::Generic(e.to_string()))?;
107 count += 1;
108 }
109 }
110 ExportFormat::Json => {
111 writeln!(writer, "[").map_err(|e| AppError::Generic(e.to_string()))?;
112 let mut first = true;
113
114 while let Some(result) = stream.next().await {
115 let dataset = result?;
116 let record = create_export_record(&dataset);
117
118 if !first {
119 writeln!(writer, ",").map_err(|e| AppError::Generic(e.to_string()))?;
120 }
121 first = false;
122
123 let json = serde_json::to_string_pretty(&record)
124 .map_err(|e| AppError::Generic(e.to_string()))?;
125 for line in json.lines() {
127 writeln!(writer, " {}", line)
128 .map_err(|e| AppError::Generic(e.to_string()))?;
129 }
130 count += 1;
131 }
132
133 writeln!(writer, "]").map_err(|e| AppError::Generic(e.to_string()))?;
134 }
135 ExportFormat::Csv => {
136 writeln!(
137 writer,
138 "id,original_id,source_portal,url,title,description,first_seen_at,last_updated_at"
139 )
140 .map_err(|e| AppError::Generic(e.to_string()))?;
141
142 while let Some(result) = stream.next().await {
143 let dataset = result?;
144 write_csv_row(writer, &dataset)?;
145 count += 1;
146 }
147 }
148 }
149
150 writer
151 .flush()
152 .map_err(|e| AppError::Generic(e.to_string()))?;
153 Ok(count)
154 }
155
156 pub async fn export_to_async_writer<W: AsyncWrite + Unpin>(
172 &self,
173 writer: &mut W,
174 format: ExportFormat,
175 portal_filter: Option<&str>,
176 limit: Option<usize>,
177 ) -> Result<u64, AppError> {
178 let mut stream = self.store.list_stream(portal_filter, limit);
179 let mut count = 0u64;
180
181 match format {
182 ExportFormat::Jsonl => {
183 while let Some(result) = stream.next().await {
184 let dataset = result?;
185 let record = create_export_record(&dataset);
186 let mut json = serde_json::to_string(&record)
187 .map_err(|e| AppError::Generic(e.to_string()))?;
188 json.push('\n');
189 writer
190 .write_all(json.as_bytes())
191 .await
192 .map_err(|e| AppError::Generic(e.to_string()))?;
193 count += 1;
194 }
195 }
196 ExportFormat::Json => {
197 writer
198 .write_all(b"[\n")
199 .await
200 .map_err(|e| AppError::Generic(e.to_string()))?;
201 let mut first = true;
202
203 while let Some(result) = stream.next().await {
204 let dataset = result?;
205 let record = create_export_record(&dataset);
206
207 if !first {
208 writer
209 .write_all(b",\n")
210 .await
211 .map_err(|e| AppError::Generic(e.to_string()))?;
212 }
213 first = false;
214
215 let json = serde_json::to_string_pretty(&record)
216 .map_err(|e| AppError::Generic(e.to_string()))?;
217 for line in json.lines() {
219 writer
220 .write_all(format!(" {}\n", line).as_bytes())
221 .await
222 .map_err(|e| AppError::Generic(e.to_string()))?;
223 }
224 count += 1;
225 }
226
227 writer
228 .write_all(b"]\n")
229 .await
230 .map_err(|e| AppError::Generic(e.to_string()))?;
231 }
232 ExportFormat::Csv => {
233 writer
234 .write_all(
235 b"id,original_id,source_portal,url,title,description,first_seen_at,last_updated_at\n",
236 )
237 .await
238 .map_err(|e| AppError::Generic(e.to_string()))?;
239
240 while let Some(result) = stream.next().await {
241 let dataset = result?;
242 let row = format_csv_row(&dataset);
243 writer
244 .write_all(row.as_bytes())
245 .await
246 .map_err(|e| AppError::Generic(e.to_string()))?;
247 count += 1;
248 }
249 }
250 }
251
252 writer
253 .flush()
254 .await
255 .map_err(|e| AppError::Generic(e.to_string()))?;
256 Ok(count)
257 }
258}
259
260#[derive(Serialize)]
262struct ExportRecord {
263 id: uuid::Uuid,
264 original_id: String,
265 source_portal: String,
266 url: String,
267 title: String,
268 description: Option<String>,
269 metadata: serde_json::Value,
270 first_seen_at: chrono::DateTime<chrono::Utc>,
271 last_updated_at: chrono::DateTime<chrono::Utc>,
272}
273
274fn create_export_record(dataset: &Dataset) -> ExportRecord {
275 ExportRecord {
276 id: dataset.id,
277 original_id: dataset.original_id.clone(),
278 source_portal: dataset.source_portal.clone(),
279 url: dataset.url.clone(),
280 title: dataset.title.clone(),
281 description: dataset.description.clone(),
282 metadata: dataset.metadata.0.clone(),
283 first_seen_at: dataset.first_seen_at,
284 last_updated_at: dataset.last_updated_at,
285 }
286}
287
288fn write_csv_row<W: Write>(writer: &mut W, dataset: &Dataset) -> Result<(), AppError> {
289 let row = format_csv_row(dataset);
290 writer
291 .write_all(row.as_bytes())
292 .map_err(|e| AppError::Generic(e.to_string()))?;
293 Ok(())
294}
295
296fn format_csv_row(dataset: &Dataset) -> String {
297 let description = dataset
298 .description
299 .as_ref()
300 .map(|d| escape_csv(d))
301 .unwrap_or_default();
302
303 format!(
304 "{},{},{},{},{},{},{},{}\n",
305 dataset.id,
306 escape_csv(&dataset.original_id),
307 escape_csv(&dataset.source_portal),
308 escape_csv(&dataset.url),
309 escape_csv(&dataset.title),
310 description,
311 dataset.first_seen_at.format("%Y-%m-%dT%H:%M:%SZ"),
312 dataset.last_updated_at.format("%Y-%m-%dT%H:%M:%SZ"),
313 )
314}
315
316fn escape_csv(s: &str) -> String {
317 if s.contains(',') || s.contains('"') || s.contains('\n') || s.contains('\r') {
318 format!("\"{}\"", s.replace('"', "\"\""))
319 } else {
320 s.to_string()
321 }
322}
323
324#[cfg(test)]
325mod tests {
326 use super::*;
327
328 #[test]
329 fn test_escape_csv_no_special_chars() {
330 assert_eq!(escape_csv("hello"), "hello");
331 }
332
333 #[test]
334 fn test_escape_csv_with_comma() {
335 assert_eq!(escape_csv("hello, world"), "\"hello, world\"");
336 }
337
338 #[test]
339 fn test_escape_csv_with_quote() {
340 assert_eq!(escape_csv("hello \"world\""), "\"hello \"\"world\"\"\"");
341 }
342
343 #[test]
344 fn test_escape_csv_with_newline() {
345 assert_eq!(escape_csv("hello\nworld"), "\"hello\nworld\"");
346 }
347}