1use async_trait::async_trait;
4use chrono::Utc;
5use rusqlite::params;
6use serde_json::Value;
7
8use crate::core::bulk_export::{
9 BulkExportStorage, ExportDataProvider, ExportJobId, ExportLevel, ExportManifest,
10 ExportOutputFile, ExportProgress, ExportRequest, ExportStatus, GroupExportProvider,
11 NdjsonBatch, PatientExportProvider, TypeExportProgress,
12};
13use crate::error::{BackendError, BulkExportError, StorageError, StorageResult};
14use crate::tenant::TenantContext;
15
16use super::SqliteBackend;
17
18fn internal_error(message: String) -> StorageError {
19 StorageError::Backend(BackendError::Internal {
20 backend_name: "sqlite".to_string(),
21 message,
22 source: None,
23 })
24}
25
26#[async_trait]
27impl BulkExportStorage for SqliteBackend {
28 async fn start_export(
29 &self,
30 tenant: &TenantContext,
31 request: ExportRequest,
32 ) -> StorageResult<ExportJobId> {
33 let conn = self.get_connection()?;
34 let tenant_id = tenant.tenant_id().as_str();
35
36 let active_count: i32 = conn
38 .query_row(
39 "SELECT COUNT(*) FROM bulk_export_jobs
40 WHERE tenant_id = ?1 AND status IN ('accepted', 'in-progress')",
41 params![tenant_id],
42 |row| row.get(0),
43 )
44 .map_err(|e| internal_error(format!("Failed to count active exports: {}", e)))?;
45
46 if active_count >= 5 {
47 return Err(StorageError::BulkExport(
48 BulkExportError::TooManyConcurrentExports { max_concurrent: 5 },
49 ));
50 }
51
52 let job_id = ExportJobId::new();
53 let now = Utc::now();
54 let transaction_time = now.to_rfc3339();
55
56 let level_str = match &request.level {
57 ExportLevel::System => "system".to_string(),
58 ExportLevel::Patient => "patient".to_string(),
59 ExportLevel::Group { .. } => "group".to_string(),
60 };
61
62 let group_id = request.group_id().map(|s| s.to_string());
63
64 let request_json = serde_json::to_string(&request)
65 .map_err(|e| internal_error(format!("Failed to serialize request: {}", e)))?;
66
67 conn.execute(
68 "INSERT INTO bulk_export_jobs
69 (id, tenant_id, status, level, group_id, request_json, transaction_time, created_at)
70 VALUES (?1, ?2, 'accepted', ?3, ?4, ?5, ?6, ?7)",
71 params![
72 job_id.as_str(),
73 tenant_id,
74 level_str,
75 group_id,
76 request_json,
77 transaction_time,
78 transaction_time
79 ],
80 )
81 .map_err(|e| internal_error(format!("Failed to create export job: {}", e)))?;
82
83 Ok(job_id)
84 }
85
86 async fn get_export_status(
87 &self,
88 tenant: &TenantContext,
89 job_id: &ExportJobId,
90 ) -> StorageResult<ExportProgress> {
91 let conn = self.get_connection()?;
92 let tenant_id = tenant.tenant_id().as_str();
93
94 let (status_str, level_str, group_id, transaction_time, started_at, completed_at, error_message, current_type):
95 (String, String, Option<String>, String, Option<String>, Option<String>, Option<String>, Option<String>) = conn
96 .query_row(
97 "SELECT status, level, group_id, transaction_time, started_at, completed_at, error_message, current_type
98 FROM bulk_export_jobs
99 WHERE id = ?1 AND tenant_id = ?2",
100 params![job_id.as_str(), tenant_id],
101 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?,
102 row.get(4)?, row.get(5)?, row.get(6)?, row.get(7)?)),
103 )
104 .map_err(|e| {
105 if matches!(e, rusqlite::Error::QueryReturnedNoRows) {
106 StorageError::BulkExport(BulkExportError::JobNotFound {
107 job_id: job_id.to_string(),
108 })
109 } else {
110 internal_error(format!("Failed to get export status: {}", e))
111 }
112 })?;
113
114 let status: ExportStatus = status_str
115 .parse()
116 .map_err(|_| internal_error(format!("Invalid status in database: {}", status_str)))?;
117
118 let level = match level_str.as_str() {
119 "system" => ExportLevel::System,
120 "patient" => ExportLevel::Patient,
121 "group" => ExportLevel::Group {
122 group_id: group_id.unwrap_or_default(),
123 },
124 _ => {
125 return Err(internal_error(format!(
126 "Invalid level in database: {}",
127 level_str
128 )));
129 }
130 };
131
132 let transaction_time = chrono::DateTime::parse_from_rfc3339(&transaction_time)
133 .map_err(|e| internal_error(format!("Invalid transaction_time: {}", e)))?
134 .with_timezone(&Utc);
135
136 let started_at = started_at.and_then(|s| {
137 chrono::DateTime::parse_from_rfc3339(&s)
138 .ok()
139 .map(|dt| dt.with_timezone(&Utc))
140 });
141
142 let completed_at = completed_at.and_then(|s| {
143 chrono::DateTime::parse_from_rfc3339(&s)
144 .ok()
145 .map(|dt| dt.with_timezone(&Utc))
146 });
147
148 let mut stmt = conn
150 .prepare(
151 "SELECT resource_type, total_count, exported_count, error_count, cursor_state
152 FROM bulk_export_progress
153 WHERE job_id = ?1",
154 )
155 .map_err(|e| internal_error(format!("Failed to prepare progress query: {}", e)))?;
156
157 let type_progress: Vec<TypeExportProgress> = stmt
158 .query_map(params![job_id.as_str()], |row| {
159 Ok(TypeExportProgress {
160 resource_type: row.get(0)?,
161 total_count: row.get::<_, Option<i64>>(1)?.map(|v| v as u64),
162 exported_count: row.get::<_, i64>(2)? as u64,
163 error_count: row.get::<_, i64>(3)? as u64,
164 cursor_state: row.get(4)?,
165 })
166 })
167 .map_err(|e| internal_error(format!("Failed to query progress: {}", e)))?
168 .filter_map(|r| r.ok())
169 .collect();
170
171 Ok(ExportProgress {
172 job_id: job_id.clone(),
173 status,
174 level,
175 transaction_time,
176 started_at,
177 completed_at,
178 type_progress,
179 current_type,
180 error_message,
181 })
182 }
183
184 async fn cancel_export(
185 &self,
186 tenant: &TenantContext,
187 job_id: &ExportJobId,
188 ) -> StorageResult<()> {
189 let conn = self.get_connection()?;
190 let tenant_id = tenant.tenant_id().as_str();
191
192 let current_status: String = conn
194 .query_row(
195 "SELECT status FROM bulk_export_jobs WHERE id = ?1 AND tenant_id = ?2",
196 params![job_id.as_str(), tenant_id],
197 |row| row.get(0),
198 )
199 .map_err(|e| {
200 if matches!(e, rusqlite::Error::QueryReturnedNoRows) {
201 StorageError::BulkExport(BulkExportError::JobNotFound {
202 job_id: job_id.to_string(),
203 })
204 } else {
205 internal_error(format!("Failed to get export status: {}", e))
206 }
207 })?;
208
209 let status: ExportStatus = current_status.parse().map_err(|_| {
210 internal_error(format!("Invalid status in database: {}", current_status))
211 })?;
212
213 if status.is_terminal() {
214 return Err(StorageError::BulkExport(BulkExportError::InvalidJobState {
215 job_id: job_id.to_string(),
216 expected: "accepted or in-progress".to_string(),
217 actual: current_status,
218 }));
219 }
220
221 let now = Utc::now().to_rfc3339();
222 conn.execute(
223 "UPDATE bulk_export_jobs SET status = 'cancelled', completed_at = ?1 WHERE id = ?2",
224 params![now, job_id.as_str()],
225 )
226 .map_err(|e| internal_error(format!("Failed to cancel export: {}", e)))?;
227
228 Ok(())
229 }
230
231 async fn delete_export(
232 &self,
233 tenant: &TenantContext,
234 job_id: &ExportJobId,
235 ) -> StorageResult<()> {
236 let conn = self.get_connection()?;
237 let tenant_id = tenant.tenant_id().as_str();
238
239 let exists: bool = conn
241 .query_row(
242 "SELECT 1 FROM bulk_export_jobs WHERE id = ?1 AND tenant_id = ?2",
243 params![job_id.as_str(), tenant_id],
244 |_| Ok(true),
245 )
246 .unwrap_or(false);
247
248 if !exists {
249 return Err(StorageError::BulkExport(BulkExportError::JobNotFound {
250 job_id: job_id.to_string(),
251 }));
252 }
253
254 conn.execute(
256 "DELETE FROM bulk_export_jobs WHERE id = ?1 AND tenant_id = ?2",
257 params![job_id.as_str(), tenant_id],
258 )
259 .map_err(|e| internal_error(format!("Failed to delete export: {}", e)))?;
260
261 Ok(())
262 }
263
264 async fn get_export_manifest(
265 &self,
266 tenant: &TenantContext,
267 job_id: &ExportJobId,
268 ) -> StorageResult<ExportManifest> {
269 let progress = self.get_export_status(tenant, job_id).await?;
270
271 if progress.status != ExportStatus::Complete {
272 return Err(StorageError::BulkExport(BulkExportError::InvalidJobState {
273 job_id: job_id.to_string(),
274 expected: "complete".to_string(),
275 actual: progress.status.to_string(),
276 }));
277 }
278
279 let conn = self.get_connection()?;
280
281 let mut stmt = conn
283 .prepare(
284 "SELECT resource_type, file_path, resource_count, file_type
285 FROM bulk_export_files
286 WHERE job_id = ?1
287 ORDER BY resource_type",
288 )
289 .map_err(|e| internal_error(format!("Failed to prepare files query: {}", e)))?;
290
291 let mut output_files = Vec::new();
292 let mut error_files = Vec::new();
293
294 let rows = stmt
295 .query_map(params![job_id.as_str()], |row| {
296 Ok((
297 row.get::<_, String>(0)?,
298 row.get::<_, String>(1)?,
299 row.get::<_, Option<i64>>(2)?.map(|v| v as u64),
300 row.get::<_, String>(3)?,
301 ))
302 })
303 .map_err(|e| internal_error(format!("Failed to query files: {}", e)))?;
304
305 for row in rows {
306 let (resource_type, file_path, count, file_type) =
307 row.map_err(|e| internal_error(format!("Failed to read file row: {}", e)))?;
308
309 let file = ExportOutputFile {
310 resource_type,
311 url: file_path,
312 count,
313 };
314
315 if file_type == "error" {
316 error_files.push(file);
317 } else {
318 output_files.push(file);
319 }
320 }
321
322 Ok(ExportManifest {
323 transaction_time: progress.transaction_time,
324 request: format!("$export?job={}", job_id),
325 requires_access_token: true,
326 output: output_files,
327 error: error_files,
328 message: None,
329 extension: None,
330 })
331 }
332
333 async fn list_exports(
334 &self,
335 tenant: &TenantContext,
336 include_completed: bool,
337 ) -> StorageResult<Vec<ExportProgress>> {
338 let job_ids: Vec<String> = {
340 let conn = self.get_connection()?;
341 let tenant_id = tenant.tenant_id().as_str();
342
343 let query = if include_completed {
344 "SELECT id FROM bulk_export_jobs WHERE tenant_id = ?1 ORDER BY created_at DESC"
345 } else {
346 "SELECT id FROM bulk_export_jobs WHERE tenant_id = ?1 AND status IN ('accepted', 'in-progress') ORDER BY created_at DESC"
347 };
348
349 let mut stmt = conn
350 .prepare(query)
351 .map_err(|e| internal_error(format!("Failed to prepare list query: {}", e)))?;
352
353 stmt.query_map(params![tenant_id], |row| row.get(0))
354 .map_err(|e| internal_error(format!("Failed to query exports: {}", e)))?
355 .filter_map(|r| r.ok())
356 .collect()
357 };
358
359 let mut results = Vec::new();
360 for id in job_ids {
361 let job_id = ExportJobId::from_string(id);
362 if let Ok(progress) = self.get_export_status(tenant, &job_id).await {
363 results.push(progress);
364 }
365 }
366
367 Ok(results)
368 }
369}
370
371#[async_trait]
372impl ExportDataProvider for SqliteBackend {
373 async fn list_export_types(
374 &self,
375 tenant: &TenantContext,
376 request: &ExportRequest,
377 ) -> StorageResult<Vec<String>> {
378 let conn = self.get_connection()?;
379 let tenant_id = tenant.tenant_id().as_str();
380
381 if !request.resource_types.is_empty() {
383 let mut valid_types = Vec::new();
385 for rt in &request.resource_types {
386 let exists: bool = conn
387 .query_row(
388 "SELECT 1 FROM resources WHERE tenant_id = ?1 AND resource_type = ?2 AND is_deleted = 0 LIMIT 1",
389 params![tenant_id, rt],
390 |_| Ok(true),
391 )
392 .unwrap_or(false);
393 if exists {
394 valid_types.push(rt.clone());
395 }
396 }
397 return Ok(valid_types);
398 }
399
400 let mut stmt = conn
402 .prepare(
403 "SELECT DISTINCT resource_type FROM resources
404 WHERE tenant_id = ?1 AND is_deleted = 0
405 ORDER BY resource_type",
406 )
407 .map_err(|e| internal_error(format!("Failed to prepare types query: {}", e)))?;
408
409 let types: Vec<String> = stmt
410 .query_map(params![tenant_id], |row| row.get(0))
411 .map_err(|e| internal_error(format!("Failed to query types: {}", e)))?
412 .filter_map(|r| r.ok())
413 .collect();
414
415 Ok(types)
416 }
417
418 async fn count_export_resources(
419 &self,
420 tenant: &TenantContext,
421 request: &ExportRequest,
422 resource_type: &str,
423 ) -> StorageResult<u64> {
424 let conn = self.get_connection()?;
425 let tenant_id = tenant.tenant_id().as_str();
426
427 let mut query = "SELECT COUNT(*) FROM resources WHERE tenant_id = ?1 AND resource_type = ?2 AND is_deleted = 0".to_string();
428 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = vec![
429 Box::new(tenant_id.to_string()),
430 Box::new(resource_type.to_string()),
431 ];
432
433 if let Some(since) = request.since {
435 query.push_str(" AND last_updated >= ?3");
436 params_vec.push(Box::new(since.to_rfc3339()));
437 }
438
439 let params_slice: Vec<&dyn rusqlite::ToSql> =
440 params_vec.iter().map(|p| p.as_ref()).collect();
441
442 let count: i64 = conn
443 .query_row(&query, params_slice.as_slice(), |row| row.get(0))
444 .map_err(|e| internal_error(format!("Failed to count resources: {}", e)))?;
445
446 Ok(count as u64)
447 }
448
449 async fn fetch_export_batch(
450 &self,
451 tenant: &TenantContext,
452 request: &ExportRequest,
453 resource_type: &str,
454 cursor: Option<&str>,
455 batch_size: u32,
456 ) -> StorageResult<NdjsonBatch> {
457 let conn = self.get_connection()?;
458 let tenant_id = tenant.tenant_id().as_str();
459
460 let mut query = "SELECT id, data, last_updated FROM resources WHERE tenant_id = ?1 AND resource_type = ?2 AND is_deleted = 0".to_string();
461 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = vec![
462 Box::new(tenant_id.to_string()),
463 Box::new(resource_type.to_string()),
464 ];
465
466 if let Some(since) = request.since {
468 query.push_str(" AND last_updated >= ?");
469 params_vec.push(Box::new(since.to_rfc3339()));
470 }
471
472 if let Some(cursor) = cursor {
474 let parts: Vec<&str> = cursor.splitn(2, '|').collect();
476 if parts.len() == 2 {
477 query.push_str(" AND (last_updated, id) > (?, ?)");
478 params_vec.push(Box::new(parts[0].to_string()));
479 params_vec.push(Box::new(parts[1].to_string()));
480 }
481 }
482
483 query.push_str(" ORDER BY last_updated, id");
484 query.push_str(&format!(" LIMIT {}", batch_size + 1)); let params_slice: Vec<&dyn rusqlite::ToSql> =
487 params_vec.iter().map(|p| p.as_ref()).collect();
488
489 let mut stmt = conn
490 .prepare(&query)
491 .map_err(|e| internal_error(format!("Failed to prepare batch query: {}", e)))?;
492
493 let rows: Vec<(String, Vec<u8>, String)> = stmt
494 .query_map(params_slice.as_slice(), |row| {
495 Ok((
496 row.get::<_, String>(0)?,
497 row.get::<_, Vec<u8>>(1)?,
498 row.get::<_, String>(2)?,
499 ))
500 })
501 .map_err(|e| internal_error(format!("Failed to query batch: {}", e)))?
502 .filter_map(|r| r.ok())
503 .collect();
504
505 let has_more = rows.len() > batch_size as usize;
506 let rows = if has_more {
507 &rows[..batch_size as usize]
508 } else {
509 &rows[..]
510 };
511
512 let mut lines = Vec::new();
513 let mut last_cursor = None;
514
515 for (id, data, last_updated) in rows {
516 let resource: Value = serde_json::from_slice(data)
517 .map_err(|e| internal_error(format!("Failed to parse resource: {}", e)))?;
518 let line = serde_json::to_string(&resource)
519 .map_err(|e| internal_error(format!("Failed to serialize resource: {}", e)))?;
520 lines.push(line);
521 last_cursor = Some(format!("{}|{}", last_updated, id));
522 }
523
524 Ok(NdjsonBatch {
525 lines,
526 next_cursor: if has_more { last_cursor } else { None },
527 is_last: !has_more,
528 })
529 }
530}
531
532#[async_trait]
533impl PatientExportProvider for SqliteBackend {
534 async fn list_patient_ids(
535 &self,
536 tenant: &TenantContext,
537 request: &ExportRequest,
538 cursor: Option<&str>,
539 batch_size: u32,
540 ) -> StorageResult<(Vec<String>, Option<String>)> {
541 let conn = self.get_connection()?;
542 let tenant_id = tenant.tenant_id().as_str();
543
544 let mut query = "SELECT id FROM resources WHERE tenant_id = ?1 AND resource_type = 'Patient' AND is_deleted = 0".to_string();
545 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(tenant_id.to_string())];
546
547 if let Some(since) = request.since {
548 query.push_str(" AND last_updated >= ?");
549 params_vec.push(Box::new(since.to_rfc3339()));
550 }
551
552 if let Some(cursor) = cursor {
553 query.push_str(" AND id > ?");
554 params_vec.push(Box::new(cursor.to_string()));
555 }
556
557 query.push_str(" ORDER BY id");
558 query.push_str(&format!(" LIMIT {}", batch_size + 1));
559
560 let params_slice: Vec<&dyn rusqlite::ToSql> =
561 params_vec.iter().map(|p| p.as_ref()).collect();
562
563 let mut stmt = conn
564 .prepare(&query)
565 .map_err(|e| internal_error(format!("Failed to prepare patient ids query: {}", e)))?;
566
567 let ids: Vec<String> = stmt
568 .query_map(params_slice.as_slice(), |row| row.get(0))
569 .map_err(|e| internal_error(format!("Failed to query patient ids: {}", e)))?
570 .filter_map(|r| r.ok())
571 .collect();
572
573 let has_more = ids.len() > batch_size as usize;
574 let ids = if has_more {
575 ids[..batch_size as usize].to_vec()
576 } else {
577 ids
578 };
579
580 let next_cursor = if has_more { ids.last().cloned() } else { None };
581
582 Ok((ids, next_cursor))
583 }
584
585 async fn fetch_patient_compartment_batch(
586 &self,
587 tenant: &TenantContext,
588 request: &ExportRequest,
589 resource_type: &str,
590 patient_ids: &[String],
591 cursor: Option<&str>,
592 batch_size: u32,
593 ) -> StorageResult<NdjsonBatch> {
594 if patient_ids.is_empty() {
595 return Ok(NdjsonBatch::empty());
596 }
597
598 let conn = self.get_connection()?;
599 let tenant_id = tenant.tenant_id().as_str();
600
601 if resource_type == "Patient" {
603 let placeholders: Vec<String> = (0..patient_ids.len())
604 .map(|i| format!("?{}", i + 3))
605 .collect();
606 let mut query = format!(
607 "SELECT id, data, last_updated FROM resources
608 WHERE tenant_id = ?1 AND resource_type = ?2 AND id IN ({}) AND is_deleted = 0",
609 placeholders.join(",")
610 );
611
612 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = vec![
613 Box::new(tenant_id.to_string()),
614 Box::new(resource_type.to_string()),
615 ];
616 for id in patient_ids {
617 params_vec.push(Box::new(id.clone()));
618 }
619
620 if let Some(cursor) = cursor {
621 let parts: Vec<&str> = cursor.splitn(2, '|').collect();
622 if parts.len() == 2 {
623 query.push_str(" AND (last_updated, id) > (?, ?)");
624 params_vec.push(Box::new(parts[0].to_string()));
625 params_vec.push(Box::new(parts[1].to_string()));
626 }
627 }
628
629 query.push_str(" ORDER BY last_updated, id");
630 query.push_str(&format!(" LIMIT {}", batch_size + 1));
631
632 let params_slice: Vec<&dyn rusqlite::ToSql> =
633 params_vec.iter().map(|p| p.as_ref()).collect();
634
635 let mut stmt = conn.prepare(&query).map_err(|e| {
636 internal_error(format!("Failed to prepare compartment query: {}", e))
637 })?;
638
639 let rows: Vec<(String, Vec<u8>, String)> = stmt
640 .query_map(params_slice.as_slice(), |row| {
641 Ok((
642 row.get::<_, String>(0)?,
643 row.get::<_, Vec<u8>>(1)?,
644 row.get::<_, String>(2)?,
645 ))
646 })
647 .map_err(|e| internal_error(format!("Failed to query compartment: {}", e)))?
648 .filter_map(|r| r.ok())
649 .collect();
650
651 let has_more = rows.len() > batch_size as usize;
652 let rows = if has_more {
653 &rows[..batch_size as usize]
654 } else {
655 &rows[..]
656 };
657
658 let mut lines = Vec::new();
659 let mut last_cursor = None;
660
661 for (id, data, last_updated) in rows {
662 let resource: Value = serde_json::from_slice(data)
663 .map_err(|e| internal_error(format!("Failed to parse resource: {}", e)))?;
664 let line = serde_json::to_string(&resource)
665 .map_err(|e| internal_error(format!("Failed to serialize resource: {}", e)))?;
666 lines.push(line);
667 last_cursor = Some(format!("{}|{}", last_updated, id));
668 }
669
670 return Ok(NdjsonBatch {
671 lines,
672 next_cursor: if has_more { last_cursor } else { None },
673 is_last: !has_more,
674 });
675 }
676
677 let patient_refs: Vec<String> = patient_ids
680 .iter()
681 .map(|id| format!("Patient/{}", id))
682 .collect();
683 let placeholders: Vec<String> = (0..patient_refs.len())
684 .map(|i| format!("?{}", i + 4))
685 .collect();
686
687 let mut query = format!(
688 "SELECT DISTINCT r.id, r.data, r.last_updated
689 FROM resources r
690 INNER JOIN search_index si ON r.tenant_id = si.tenant_id
691 AND r.resource_type = si.resource_type
692 AND r.id = si.resource_id
693 WHERE r.tenant_id = ?1
694 AND r.resource_type = ?2
695 AND r.is_deleted = 0
696 AND si.param_name IN ('subject', 'patient')
697 AND si.value_reference IN ({})",
698 placeholders.join(",")
699 );
700
701 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = vec![
702 Box::new(tenant_id.to_string()),
703 Box::new(resource_type.to_string()),
704 ];
705 let since_value = request.since.map(|s| s.to_rfc3339());
707 if since_value.is_some() {
708 params_vec.push(Box::new(since_value.clone().unwrap()));
709 }
710 for patient_ref in &patient_refs {
711 params_vec.push(Box::new(patient_ref.clone()));
712 }
713
714 if request.since.is_some() {
715 query = query.replace(
716 "r.is_deleted = 0",
717 "r.is_deleted = 0 AND r.last_updated >= ?3",
718 );
719 }
720
721 if let Some(cursor) = cursor {
722 let parts: Vec<&str> = cursor.splitn(2, '|').collect();
723 if parts.len() == 2 {
724 query.push_str(" AND (r.last_updated, r.id) > (?, ?)");
725 params_vec.push(Box::new(parts[0].to_string()));
726 params_vec.push(Box::new(parts[1].to_string()));
727 }
728 }
729
730 query.push_str(" ORDER BY r.last_updated, r.id");
731 query.push_str(&format!(" LIMIT {}", batch_size + 1));
732
733 let params_slice: Vec<&dyn rusqlite::ToSql> =
734 params_vec.iter().map(|p| p.as_ref()).collect();
735
736 let mut stmt = conn
737 .prepare(&query)
738 .map_err(|e| internal_error(format!("Failed to prepare compartment query: {}", e)))?;
739
740 let rows: Vec<(String, Vec<u8>, String)> = stmt
741 .query_map(params_slice.as_slice(), |row| {
742 Ok((
743 row.get::<_, String>(0)?,
744 row.get::<_, Vec<u8>>(1)?,
745 row.get::<_, String>(2)?,
746 ))
747 })
748 .map_err(|e| internal_error(format!("Failed to query compartment: {}", e)))?
749 .filter_map(|r| r.ok())
750 .collect();
751
752 let has_more = rows.len() > batch_size as usize;
753 let rows = if has_more {
754 &rows[..batch_size as usize]
755 } else {
756 &rows[..]
757 };
758
759 let mut lines = Vec::new();
760 let mut last_cursor = None;
761
762 for (id, data, last_updated) in rows {
763 let resource: Value = serde_json::from_slice(data)
764 .map_err(|e| internal_error(format!("Failed to parse resource: {}", e)))?;
765 let line = serde_json::to_string(&resource)
766 .map_err(|e| internal_error(format!("Failed to serialize resource: {}", e)))?;
767 lines.push(line);
768 last_cursor = Some(format!("{}|{}", last_updated, id));
769 }
770
771 Ok(NdjsonBatch {
772 lines,
773 next_cursor: if has_more { last_cursor } else { None },
774 is_last: !has_more,
775 })
776 }
777}
778
779#[async_trait]
780impl GroupExportProvider for SqliteBackend {
781 async fn get_group_members(
782 &self,
783 tenant: &TenantContext,
784 group_id: &str,
785 ) -> StorageResult<Vec<String>> {
786 let conn = self.get_connection()?;
787 let tenant_id = tenant.tenant_id().as_str();
788
789 let data: Vec<u8> = conn
791 .query_row(
792 "SELECT data FROM resources WHERE tenant_id = ?1 AND resource_type = 'Group' AND id = ?2 AND is_deleted = 0",
793 params![tenant_id, group_id],
794 |row| row.get(0),
795 )
796 .map_err(|e| {
797 if matches!(e, rusqlite::Error::QueryReturnedNoRows) {
798 StorageError::BulkExport(BulkExportError::GroupNotFound {
799 group_id: group_id.to_string(),
800 })
801 } else {
802 internal_error(format!("Failed to get group: {}", e))
803 }
804 })?;
805
806 let group: Value = serde_json::from_slice(&data)
807 .map_err(|e| internal_error(format!("Failed to parse group: {}", e)))?;
808
809 let mut members = Vec::new();
811 if let Some(member_array) = group.get("member").and_then(|m| m.as_array()) {
812 for member in member_array {
813 if let Some(entity) = member.get("entity") {
814 if let Some(reference) = entity.get("reference").and_then(|r| r.as_str()) {
815 members.push(reference.to_string());
816 }
817 }
818 }
819 }
820
821 Ok(members)
822 }
823
824 async fn resolve_group_patient_ids(
825 &self,
826 tenant: &TenantContext,
827 group_id: &str,
828 ) -> StorageResult<Vec<String>> {
829 let members = self.get_group_members(tenant, group_id).await?;
830
831 let patient_ids: Vec<String> = members
833 .into_iter()
834 .filter_map(|reference| {
835 if reference.starts_with("Patient/") {
836 Some(reference.strip_prefix("Patient/").unwrap().to_string())
837 } else {
838 None
839 }
840 })
841 .collect();
842
843 Ok(patient_ids)
844 }
845}
846
847#[cfg(test)]
848mod tests {
849 use super::*;
850 use crate::core::ResourceStorage;
851 use crate::tenant::{TenantId, TenantPermissions};
852 use helios_fhir::FhirVersion;
853 use serde_json::json;
854
855 fn create_test_backend() -> SqliteBackend {
856 let backend = SqliteBackend::in_memory().unwrap();
857 backend.init_schema().unwrap();
858 backend
859 }
860
861 fn create_test_tenant() -> TenantContext {
862 TenantContext::new(
863 TenantId::new("test-tenant"),
864 TenantPermissions::full_access(),
865 )
866 }
867
868 #[tokio::test]
869 async fn test_start_export() {
870 let backend = create_test_backend();
871 let tenant = create_test_tenant();
872
873 let request = ExportRequest::system().with_types(vec!["Patient".to_string()]);
874 let job_id = backend.start_export(&tenant, request).await.unwrap();
875
876 assert!(!job_id.as_str().is_empty());
877
878 let progress = backend.get_export_status(&tenant, &job_id).await.unwrap();
879 assert_eq!(progress.status, ExportStatus::Accepted);
880 }
881
882 #[tokio::test]
883 async fn test_cancel_export() {
884 let backend = create_test_backend();
885 let tenant = create_test_tenant();
886
887 let request = ExportRequest::system();
888 let job_id = backend.start_export(&tenant, request).await.unwrap();
889
890 backend.cancel_export(&tenant, &job_id).await.unwrap();
891
892 let progress = backend.get_export_status(&tenant, &job_id).await.unwrap();
893 assert_eq!(progress.status, ExportStatus::Cancelled);
894 }
895
896 #[tokio::test]
897 async fn test_list_exports() {
898 let backend = create_test_backend();
899 let tenant = create_test_tenant();
900
901 let request1 = ExportRequest::system();
903 let _job_id1 = backend.start_export(&tenant, request1).await.unwrap();
904
905 let request2 = ExportRequest::patient();
906 let _job_id2 = backend.start_export(&tenant, request2).await.unwrap();
907
908 let exports = backend.list_exports(&tenant, false).await.unwrap();
909 assert_eq!(exports.len(), 2);
910 }
911
912 #[tokio::test]
913 async fn test_too_many_concurrent_exports() {
914 let backend = create_test_backend();
915 let tenant = create_test_tenant();
916
917 for _ in 0..5 {
919 let request = ExportRequest::system();
920 backend.start_export(&tenant, request).await.unwrap();
921 }
922
923 let request = ExportRequest::system();
925 let result = backend.start_export(&tenant, request).await;
926 assert!(matches!(
927 result,
928 Err(StorageError::BulkExport(
929 BulkExportError::TooManyConcurrentExports { .. }
930 ))
931 ));
932 }
933
934 #[tokio::test]
935 async fn test_list_export_types() {
936 let backend = create_test_backend();
937 let tenant = create_test_tenant();
938
939 backend
941 .create(
942 &tenant,
943 "Patient",
944 json!({"resourceType": "Patient", "name": [{"family": "Test"}]}),
945 FhirVersion::default(),
946 )
947 .await
948 .unwrap();
949
950 backend
951 .create(
952 &tenant,
953 "Observation",
954 json!({"resourceType": "Observation", "status": "final"}),
955 FhirVersion::default(),
956 )
957 .await
958 .unwrap();
959
960 let request = ExportRequest::system();
961 let types = backend.list_export_types(&tenant, &request).await.unwrap();
962
963 assert!(types.contains(&"Patient".to_string()));
964 assert!(types.contains(&"Observation".to_string()));
965 }
966
967 #[tokio::test]
968 async fn test_fetch_export_batch() {
969 let backend = create_test_backend();
970 let tenant = create_test_tenant();
971
972 for i in 0..5 {
974 backend
975 .create(
976 &tenant,
977 "Patient",
978 json!({"resourceType": "Patient", "name": [{"family": format!("Patient{}", i)}]}),
979 FhirVersion::default(),
980 )
981 .await
982 .unwrap();
983 }
984
985 let request = ExportRequest::system();
986 let batch = backend
987 .fetch_export_batch(&tenant, &request, "Patient", None, 3)
988 .await
989 .unwrap();
990
991 assert_eq!(batch.lines.len(), 3);
992 assert!(!batch.is_last);
993 assert!(batch.next_cursor.is_some());
994
995 let batch2 = backend
997 .fetch_export_batch(
998 &tenant,
999 &request,
1000 "Patient",
1001 batch.next_cursor.as_deref(),
1002 3,
1003 )
1004 .await
1005 .unwrap();
1006
1007 assert_eq!(batch2.lines.len(), 2);
1008 assert!(batch2.is_last);
1009 }
1010
1011 #[tokio::test]
1012 async fn test_delete_export() {
1013 let backend = create_test_backend();
1014 let tenant = create_test_tenant();
1015
1016 let request = ExportRequest::system();
1017 let job_id = backend.start_export(&tenant, request).await.unwrap();
1018
1019 backend.delete_export(&tenant, &job_id).await.unwrap();
1020
1021 let result = backend.get_export_status(&tenant, &job_id).await;
1023 assert!(matches!(
1024 result,
1025 Err(StorageError::BulkExport(
1026 BulkExportError::JobNotFound { .. }
1027 ))
1028 ));
1029 }
1030}