Skip to main content

helios_persistence/backends/sqlite/
bulk_export.rs

1//! Bulk export implementation for SQLite backend.
2
3use async_trait::async_trait;
4use chrono::Utc;
5use rusqlite::params;
6use serde_json::Value;
7
8use crate::core::bulk_export::{
9    BulkExportStorage, ExportDataProvider, ExportJobId, ExportLevel, ExportManifest,
10    ExportOutputFile, ExportProgress, ExportRequest, ExportStatus, GroupExportProvider,
11    NdjsonBatch, PatientExportProvider, TypeExportProgress,
12};
13use crate::error::{BackendError, BulkExportError, StorageError, StorageResult};
14use crate::tenant::TenantContext;
15
16use super::SqliteBackend;
17
18fn internal_error(message: String) -> StorageError {
19    StorageError::Backend(BackendError::Internal {
20        backend_name: "sqlite".to_string(),
21        message,
22        source: None,
23    })
24}
25
26#[async_trait]
27impl BulkExportStorage for SqliteBackend {
28    async fn start_export(
29        &self,
30        tenant: &TenantContext,
31        request: ExportRequest,
32    ) -> StorageResult<ExportJobId> {
33        let conn = self.get_connection()?;
34        let tenant_id = tenant.tenant_id().as_str();
35
36        // Check for too many concurrent exports (limit to 5 active exports per tenant)
37        let active_count: i32 = conn
38            .query_row(
39                "SELECT COUNT(*) FROM bulk_export_jobs
40                 WHERE tenant_id = ?1 AND status IN ('accepted', 'in-progress')",
41                params![tenant_id],
42                |row| row.get(0),
43            )
44            .map_err(|e| internal_error(format!("Failed to count active exports: {}", e)))?;
45
46        if active_count >= 5 {
47            return Err(StorageError::BulkExport(
48                BulkExportError::TooManyConcurrentExports { max_concurrent: 5 },
49            ));
50        }
51
52        let job_id = ExportJobId::new();
53        let now = Utc::now();
54        let transaction_time = now.to_rfc3339();
55
56        let level_str = match &request.level {
57            ExportLevel::System => "system".to_string(),
58            ExportLevel::Patient => "patient".to_string(),
59            ExportLevel::Group { .. } => "group".to_string(),
60        };
61
62        let group_id = request.group_id().map(|s| s.to_string());
63
64        let request_json = serde_json::to_string(&request)
65            .map_err(|e| internal_error(format!("Failed to serialize request: {}", e)))?;
66
67        conn.execute(
68            "INSERT INTO bulk_export_jobs
69             (id, tenant_id, status, level, group_id, request_json, transaction_time, created_at)
70             VALUES (?1, ?2, 'accepted', ?3, ?4, ?5, ?6, ?7)",
71            params![
72                job_id.as_str(),
73                tenant_id,
74                level_str,
75                group_id,
76                request_json,
77                transaction_time,
78                transaction_time
79            ],
80        )
81        .map_err(|e| internal_error(format!("Failed to create export job: {}", e)))?;
82
83        Ok(job_id)
84    }
85
86    async fn get_export_status(
87        &self,
88        tenant: &TenantContext,
89        job_id: &ExportJobId,
90    ) -> StorageResult<ExportProgress> {
91        let conn = self.get_connection()?;
92        let tenant_id = tenant.tenant_id().as_str();
93
94        let (status_str, level_str, group_id, transaction_time, started_at, completed_at, error_message, current_type):
95            (String, String, Option<String>, String, Option<String>, Option<String>, Option<String>, Option<String>) = conn
96            .query_row(
97                "SELECT status, level, group_id, transaction_time, started_at, completed_at, error_message, current_type
98                 FROM bulk_export_jobs
99                 WHERE id = ?1 AND tenant_id = ?2",
100                params![job_id.as_str(), tenant_id],
101                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?,
102                          row.get(4)?, row.get(5)?, row.get(6)?, row.get(7)?)),
103            )
104            .map_err(|e| {
105                if matches!(e, rusqlite::Error::QueryReturnedNoRows) {
106                    StorageError::BulkExport(BulkExportError::JobNotFound {
107                        job_id: job_id.to_string(),
108                    })
109                } else {
110                    internal_error(format!("Failed to get export status: {}", e))
111                }
112            })?;
113
114        let status: ExportStatus = status_str
115            .parse()
116            .map_err(|_| internal_error(format!("Invalid status in database: {}", status_str)))?;
117
118        let level = match level_str.as_str() {
119            "system" => ExportLevel::System,
120            "patient" => ExportLevel::Patient,
121            "group" => ExportLevel::Group {
122                group_id: group_id.unwrap_or_default(),
123            },
124            _ => {
125                return Err(internal_error(format!(
126                    "Invalid level in database: {}",
127                    level_str
128                )));
129            }
130        };
131
132        let transaction_time = chrono::DateTime::parse_from_rfc3339(&transaction_time)
133            .map_err(|e| internal_error(format!("Invalid transaction_time: {}", e)))?
134            .with_timezone(&Utc);
135
136        let started_at = started_at.and_then(|s| {
137            chrono::DateTime::parse_from_rfc3339(&s)
138                .ok()
139                .map(|dt| dt.with_timezone(&Utc))
140        });
141
142        let completed_at = completed_at.and_then(|s| {
143            chrono::DateTime::parse_from_rfc3339(&s)
144                .ok()
145                .map(|dt| dt.with_timezone(&Utc))
146        });
147
148        // Get per-type progress
149        let mut stmt = conn
150            .prepare(
151                "SELECT resource_type, total_count, exported_count, error_count, cursor_state
152                 FROM bulk_export_progress
153                 WHERE job_id = ?1",
154            )
155            .map_err(|e| internal_error(format!("Failed to prepare progress query: {}", e)))?;
156
157        let type_progress: Vec<TypeExportProgress> = stmt
158            .query_map(params![job_id.as_str()], |row| {
159                Ok(TypeExportProgress {
160                    resource_type: row.get(0)?,
161                    total_count: row.get::<_, Option<i64>>(1)?.map(|v| v as u64),
162                    exported_count: row.get::<_, i64>(2)? as u64,
163                    error_count: row.get::<_, i64>(3)? as u64,
164                    cursor_state: row.get(4)?,
165                })
166            })
167            .map_err(|e| internal_error(format!("Failed to query progress: {}", e)))?
168            .filter_map(|r| r.ok())
169            .collect();
170
171        Ok(ExportProgress {
172            job_id: job_id.clone(),
173            status,
174            level,
175            transaction_time,
176            started_at,
177            completed_at,
178            type_progress,
179            current_type,
180            error_message,
181        })
182    }
183
184    async fn cancel_export(
185        &self,
186        tenant: &TenantContext,
187        job_id: &ExportJobId,
188    ) -> StorageResult<()> {
189        let conn = self.get_connection()?;
190        let tenant_id = tenant.tenant_id().as_str();
191
192        // Check current status
193        let current_status: String = conn
194            .query_row(
195                "SELECT status FROM bulk_export_jobs WHERE id = ?1 AND tenant_id = ?2",
196                params![job_id.as_str(), tenant_id],
197                |row| row.get(0),
198            )
199            .map_err(|e| {
200                if matches!(e, rusqlite::Error::QueryReturnedNoRows) {
201                    StorageError::BulkExport(BulkExportError::JobNotFound {
202                        job_id: job_id.to_string(),
203                    })
204                } else {
205                    internal_error(format!("Failed to get export status: {}", e))
206                }
207            })?;
208
209        let status: ExportStatus = current_status.parse().map_err(|_| {
210            internal_error(format!("Invalid status in database: {}", current_status))
211        })?;
212
213        if status.is_terminal() {
214            return Err(StorageError::BulkExport(BulkExportError::InvalidJobState {
215                job_id: job_id.to_string(),
216                expected: "accepted or in-progress".to_string(),
217                actual: current_status,
218            }));
219        }
220
221        let now = Utc::now().to_rfc3339();
222        conn.execute(
223            "UPDATE bulk_export_jobs SET status = 'cancelled', completed_at = ?1 WHERE id = ?2",
224            params![now, job_id.as_str()],
225        )
226        .map_err(|e| internal_error(format!("Failed to cancel export: {}", e)))?;
227
228        Ok(())
229    }
230
231    async fn delete_export(
232        &self,
233        tenant: &TenantContext,
234        job_id: &ExportJobId,
235    ) -> StorageResult<()> {
236        let conn = self.get_connection()?;
237        let tenant_id = tenant.tenant_id().as_str();
238
239        // Check exists
240        let exists: bool = conn
241            .query_row(
242                "SELECT 1 FROM bulk_export_jobs WHERE id = ?1 AND tenant_id = ?2",
243                params![job_id.as_str(), tenant_id],
244                |_| Ok(true),
245            )
246            .unwrap_or(false);
247
248        if !exists {
249            return Err(StorageError::BulkExport(BulkExportError::JobNotFound {
250                job_id: job_id.to_string(),
251            }));
252        }
253
254        // Delete job (cascades to progress and files due to foreign keys)
255        conn.execute(
256            "DELETE FROM bulk_export_jobs WHERE id = ?1 AND tenant_id = ?2",
257            params![job_id.as_str(), tenant_id],
258        )
259        .map_err(|e| internal_error(format!("Failed to delete export: {}", e)))?;
260
261        Ok(())
262    }
263
264    async fn get_export_manifest(
265        &self,
266        tenant: &TenantContext,
267        job_id: &ExportJobId,
268    ) -> StorageResult<ExportManifest> {
269        let progress = self.get_export_status(tenant, job_id).await?;
270
271        if progress.status != ExportStatus::Complete {
272            return Err(StorageError::BulkExport(BulkExportError::InvalidJobState {
273                job_id: job_id.to_string(),
274                expected: "complete".to_string(),
275                actual: progress.status.to_string(),
276            }));
277        }
278
279        let conn = self.get_connection()?;
280
281        // Get output files
282        let mut stmt = conn
283            .prepare(
284                "SELECT resource_type, file_path, resource_count, file_type
285                 FROM bulk_export_files
286                 WHERE job_id = ?1
287                 ORDER BY resource_type",
288            )
289            .map_err(|e| internal_error(format!("Failed to prepare files query: {}", e)))?;
290
291        let mut output_files = Vec::new();
292        let mut error_files = Vec::new();
293
294        let rows = stmt
295            .query_map(params![job_id.as_str()], |row| {
296                Ok((
297                    row.get::<_, String>(0)?,
298                    row.get::<_, String>(1)?,
299                    row.get::<_, Option<i64>>(2)?.map(|v| v as u64),
300                    row.get::<_, String>(3)?,
301                ))
302            })
303            .map_err(|e| internal_error(format!("Failed to query files: {}", e)))?;
304
305        for row in rows {
306            let (resource_type, file_path, count, file_type) =
307                row.map_err(|e| internal_error(format!("Failed to read file row: {}", e)))?;
308
309            let file = ExportOutputFile {
310                resource_type,
311                url: file_path,
312                count,
313            };
314
315            if file_type == "error" {
316                error_files.push(file);
317            } else {
318                output_files.push(file);
319            }
320        }
321
322        Ok(ExportManifest {
323            transaction_time: progress.transaction_time,
324            request: format!("$export?job={}", job_id),
325            requires_access_token: true,
326            output: output_files,
327            error: error_files,
328            message: None,
329            extension: None,
330        })
331    }
332
333    async fn list_exports(
334        &self,
335        tenant: &TenantContext,
336        include_completed: bool,
337    ) -> StorageResult<Vec<ExportProgress>> {
338        // Collect IDs first, then drop the connection before calling async methods
339        let job_ids: Vec<String> = {
340            let conn = self.get_connection()?;
341            let tenant_id = tenant.tenant_id().as_str();
342
343            let query = if include_completed {
344                "SELECT id FROM bulk_export_jobs WHERE tenant_id = ?1 ORDER BY created_at DESC"
345            } else {
346                "SELECT id FROM bulk_export_jobs WHERE tenant_id = ?1 AND status IN ('accepted', 'in-progress') ORDER BY created_at DESC"
347            };
348
349            let mut stmt = conn
350                .prepare(query)
351                .map_err(|e| internal_error(format!("Failed to prepare list query: {}", e)))?;
352
353            stmt.query_map(params![tenant_id], |row| row.get(0))
354                .map_err(|e| internal_error(format!("Failed to query exports: {}", e)))?
355                .filter_map(|r| r.ok())
356                .collect()
357        };
358
359        let mut results = Vec::new();
360        for id in job_ids {
361            let job_id = ExportJobId::from_string(id);
362            if let Ok(progress) = self.get_export_status(tenant, &job_id).await {
363                results.push(progress);
364            }
365        }
366
367        Ok(results)
368    }
369}
370
371#[async_trait]
372impl ExportDataProvider for SqliteBackend {
373    async fn list_export_types(
374        &self,
375        tenant: &TenantContext,
376        request: &ExportRequest,
377    ) -> StorageResult<Vec<String>> {
378        let conn = self.get_connection()?;
379        let tenant_id = tenant.tenant_id().as_str();
380
381        // If specific types are requested, validate and return them
382        if !request.resource_types.is_empty() {
383            // Verify the types exist in the database
384            let mut valid_types = Vec::new();
385            for rt in &request.resource_types {
386                let exists: bool = conn
387                    .query_row(
388                        "SELECT 1 FROM resources WHERE tenant_id = ?1 AND resource_type = ?2 AND is_deleted = 0 LIMIT 1",
389                        params![tenant_id, rt],
390                        |_| Ok(true),
391                    )
392                    .unwrap_or(false);
393                if exists {
394                    valid_types.push(rt.clone());
395                }
396            }
397            return Ok(valid_types);
398        }
399
400        // Otherwise, get all types with data
401        let mut stmt = conn
402            .prepare(
403                "SELECT DISTINCT resource_type FROM resources
404                 WHERE tenant_id = ?1 AND is_deleted = 0
405                 ORDER BY resource_type",
406            )
407            .map_err(|e| internal_error(format!("Failed to prepare types query: {}", e)))?;
408
409        let types: Vec<String> = stmt
410            .query_map(params![tenant_id], |row| row.get(0))
411            .map_err(|e| internal_error(format!("Failed to query types: {}", e)))?
412            .filter_map(|r| r.ok())
413            .collect();
414
415        Ok(types)
416    }
417
418    async fn count_export_resources(
419        &self,
420        tenant: &TenantContext,
421        request: &ExportRequest,
422        resource_type: &str,
423    ) -> StorageResult<u64> {
424        let conn = self.get_connection()?;
425        let tenant_id = tenant.tenant_id().as_str();
426
427        let mut query = "SELECT COUNT(*) FROM resources WHERE tenant_id = ?1 AND resource_type = ?2 AND is_deleted = 0".to_string();
428        let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = vec![
429            Box::new(tenant_id.to_string()),
430            Box::new(resource_type.to_string()),
431        ];
432
433        // Apply _since filter if present
434        if let Some(since) = request.since {
435            query.push_str(" AND last_updated >= ?3");
436            params_vec.push(Box::new(since.to_rfc3339()));
437        }
438
439        let params_slice: Vec<&dyn rusqlite::ToSql> =
440            params_vec.iter().map(|p| p.as_ref()).collect();
441
442        let count: i64 = conn
443            .query_row(&query, params_slice.as_slice(), |row| row.get(0))
444            .map_err(|e| internal_error(format!("Failed to count resources: {}", e)))?;
445
446        Ok(count as u64)
447    }
448
449    async fn fetch_export_batch(
450        &self,
451        tenant: &TenantContext,
452        request: &ExportRequest,
453        resource_type: &str,
454        cursor: Option<&str>,
455        batch_size: u32,
456    ) -> StorageResult<NdjsonBatch> {
457        let conn = self.get_connection()?;
458        let tenant_id = tenant.tenant_id().as_str();
459
460        let mut query = "SELECT id, data, last_updated FROM resources WHERE tenant_id = ?1 AND resource_type = ?2 AND is_deleted = 0".to_string();
461        let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = vec![
462            Box::new(tenant_id.to_string()),
463            Box::new(resource_type.to_string()),
464        ];
465
466        // Apply _since filter if present
467        if let Some(since) = request.since {
468            query.push_str(" AND last_updated >= ?");
469            params_vec.push(Box::new(since.to_rfc3339()));
470        }
471
472        // Apply cursor (keyset pagination)
473        if let Some(cursor) = cursor {
474            // Cursor format: "last_updated|id"
475            let parts: Vec<&str> = cursor.splitn(2, '|').collect();
476            if parts.len() == 2 {
477                query.push_str(" AND (last_updated, id) > (?, ?)");
478                params_vec.push(Box::new(parts[0].to_string()));
479                params_vec.push(Box::new(parts[1].to_string()));
480            }
481        }
482
483        query.push_str(" ORDER BY last_updated, id");
484        query.push_str(&format!(" LIMIT {}", batch_size + 1)); // Fetch one extra to detect if there's more
485
486        let params_slice: Vec<&dyn rusqlite::ToSql> =
487            params_vec.iter().map(|p| p.as_ref()).collect();
488
489        let mut stmt = conn
490            .prepare(&query)
491            .map_err(|e| internal_error(format!("Failed to prepare batch query: {}", e)))?;
492
493        let rows: Vec<(String, Vec<u8>, String)> = stmt
494            .query_map(params_slice.as_slice(), |row| {
495                Ok((
496                    row.get::<_, String>(0)?,
497                    row.get::<_, Vec<u8>>(1)?,
498                    row.get::<_, String>(2)?,
499                ))
500            })
501            .map_err(|e| internal_error(format!("Failed to query batch: {}", e)))?
502            .filter_map(|r| r.ok())
503            .collect();
504
505        let has_more = rows.len() > batch_size as usize;
506        let rows = if has_more {
507            &rows[..batch_size as usize]
508        } else {
509            &rows[..]
510        };
511
512        let mut lines = Vec::new();
513        let mut last_cursor = None;
514
515        for (id, data, last_updated) in rows {
516            let resource: Value = serde_json::from_slice(data)
517                .map_err(|e| internal_error(format!("Failed to parse resource: {}", e)))?;
518            let line = serde_json::to_string(&resource)
519                .map_err(|e| internal_error(format!("Failed to serialize resource: {}", e)))?;
520            lines.push(line);
521            last_cursor = Some(format!("{}|{}", last_updated, id));
522        }
523
524        Ok(NdjsonBatch {
525            lines,
526            next_cursor: if has_more { last_cursor } else { None },
527            is_last: !has_more,
528        })
529    }
530}
531
532#[async_trait]
533impl PatientExportProvider for SqliteBackend {
534    async fn list_patient_ids(
535        &self,
536        tenant: &TenantContext,
537        request: &ExportRequest,
538        cursor: Option<&str>,
539        batch_size: u32,
540    ) -> StorageResult<(Vec<String>, Option<String>)> {
541        let conn = self.get_connection()?;
542        let tenant_id = tenant.tenant_id().as_str();
543
544        let mut query = "SELECT id FROM resources WHERE tenant_id = ?1 AND resource_type = 'Patient' AND is_deleted = 0".to_string();
545        let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(tenant_id.to_string())];
546
547        if let Some(since) = request.since {
548            query.push_str(" AND last_updated >= ?");
549            params_vec.push(Box::new(since.to_rfc3339()));
550        }
551
552        if let Some(cursor) = cursor {
553            query.push_str(" AND id > ?");
554            params_vec.push(Box::new(cursor.to_string()));
555        }
556
557        query.push_str(" ORDER BY id");
558        query.push_str(&format!(" LIMIT {}", batch_size + 1));
559
560        let params_slice: Vec<&dyn rusqlite::ToSql> =
561            params_vec.iter().map(|p| p.as_ref()).collect();
562
563        let mut stmt = conn
564            .prepare(&query)
565            .map_err(|e| internal_error(format!("Failed to prepare patient ids query: {}", e)))?;
566
567        let ids: Vec<String> = stmt
568            .query_map(params_slice.as_slice(), |row| row.get(0))
569            .map_err(|e| internal_error(format!("Failed to query patient ids: {}", e)))?
570            .filter_map(|r| r.ok())
571            .collect();
572
573        let has_more = ids.len() > batch_size as usize;
574        let ids = if has_more {
575            ids[..batch_size as usize].to_vec()
576        } else {
577            ids
578        };
579
580        let next_cursor = if has_more { ids.last().cloned() } else { None };
581
582        Ok((ids, next_cursor))
583    }
584
585    async fn fetch_patient_compartment_batch(
586        &self,
587        tenant: &TenantContext,
588        request: &ExportRequest,
589        resource_type: &str,
590        patient_ids: &[String],
591        cursor: Option<&str>,
592        batch_size: u32,
593    ) -> StorageResult<NdjsonBatch> {
594        if patient_ids.is_empty() {
595            return Ok(NdjsonBatch::empty());
596        }
597
598        let conn = self.get_connection()?;
599        let tenant_id = tenant.tenant_id().as_str();
600
601        // For Patient resources, just filter by the IDs
602        if resource_type == "Patient" {
603            let placeholders: Vec<String> = (0..patient_ids.len())
604                .map(|i| format!("?{}", i + 3))
605                .collect();
606            let mut query = format!(
607                "SELECT id, data, last_updated FROM resources
608                 WHERE tenant_id = ?1 AND resource_type = ?2 AND id IN ({}) AND is_deleted = 0",
609                placeholders.join(",")
610            );
611
612            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = vec![
613                Box::new(tenant_id.to_string()),
614                Box::new(resource_type.to_string()),
615            ];
616            for id in patient_ids {
617                params_vec.push(Box::new(id.clone()));
618            }
619
620            if let Some(cursor) = cursor {
621                let parts: Vec<&str> = cursor.splitn(2, '|').collect();
622                if parts.len() == 2 {
623                    query.push_str(" AND (last_updated, id) > (?, ?)");
624                    params_vec.push(Box::new(parts[0].to_string()));
625                    params_vec.push(Box::new(parts[1].to_string()));
626                }
627            }
628
629            query.push_str(" ORDER BY last_updated, id");
630            query.push_str(&format!(" LIMIT {}", batch_size + 1));
631
632            let params_slice: Vec<&dyn rusqlite::ToSql> =
633                params_vec.iter().map(|p| p.as_ref()).collect();
634
635            let mut stmt = conn.prepare(&query).map_err(|e| {
636                internal_error(format!("Failed to prepare compartment query: {}", e))
637            })?;
638
639            let rows: Vec<(String, Vec<u8>, String)> = stmt
640                .query_map(params_slice.as_slice(), |row| {
641                    Ok((
642                        row.get::<_, String>(0)?,
643                        row.get::<_, Vec<u8>>(1)?,
644                        row.get::<_, String>(2)?,
645                    ))
646                })
647                .map_err(|e| internal_error(format!("Failed to query compartment: {}", e)))?
648                .filter_map(|r| r.ok())
649                .collect();
650
651            let has_more = rows.len() > batch_size as usize;
652            let rows = if has_more {
653                &rows[..batch_size as usize]
654            } else {
655                &rows[..]
656            };
657
658            let mut lines = Vec::new();
659            let mut last_cursor = None;
660
661            for (id, data, last_updated) in rows {
662                let resource: Value = serde_json::from_slice(data)
663                    .map_err(|e| internal_error(format!("Failed to parse resource: {}", e)))?;
664                let line = serde_json::to_string(&resource)
665                    .map_err(|e| internal_error(format!("Failed to serialize resource: {}", e)))?;
666                lines.push(line);
667                last_cursor = Some(format!("{}|{}", last_updated, id));
668            }
669
670            return Ok(NdjsonBatch {
671                lines,
672                next_cursor: if has_more { last_cursor } else { None },
673                is_last: !has_more,
674            });
675        }
676
677        // For other resources, we need to use the search index to find resources
678        // that reference these patients via subject/patient parameters
679        let patient_refs: Vec<String> = patient_ids
680            .iter()
681            .map(|id| format!("Patient/{}", id))
682            .collect();
683        let placeholders: Vec<String> = (0..patient_refs.len())
684            .map(|i| format!("?{}", i + 4))
685            .collect();
686
687        let mut query = format!(
688            "SELECT DISTINCT r.id, r.data, r.last_updated
689             FROM resources r
690             INNER JOIN search_index si ON r.tenant_id = si.tenant_id
691                AND r.resource_type = si.resource_type
692                AND r.id = si.resource_id
693             WHERE r.tenant_id = ?1
694                AND r.resource_type = ?2
695                AND r.is_deleted = 0
696                AND si.param_name IN ('subject', 'patient')
697                AND si.value_reference IN ({})",
698            placeholders.join(",")
699        );
700
701        let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = vec![
702            Box::new(tenant_id.to_string()),
703            Box::new(resource_type.to_string()),
704        ];
705        // Placeholder for since filter slot
706        let since_value = request.since.map(|s| s.to_rfc3339());
707        if since_value.is_some() {
708            params_vec.push(Box::new(since_value.clone().unwrap()));
709        }
710        for patient_ref in &patient_refs {
711            params_vec.push(Box::new(patient_ref.clone()));
712        }
713
714        if request.since.is_some() {
715            query = query.replace(
716                "r.is_deleted = 0",
717                "r.is_deleted = 0 AND r.last_updated >= ?3",
718            );
719        }
720
721        if let Some(cursor) = cursor {
722            let parts: Vec<&str> = cursor.splitn(2, '|').collect();
723            if parts.len() == 2 {
724                query.push_str(" AND (r.last_updated, r.id) > (?, ?)");
725                params_vec.push(Box::new(parts[0].to_string()));
726                params_vec.push(Box::new(parts[1].to_string()));
727            }
728        }
729
730        query.push_str(" ORDER BY r.last_updated, r.id");
731        query.push_str(&format!(" LIMIT {}", batch_size + 1));
732
733        let params_slice: Vec<&dyn rusqlite::ToSql> =
734            params_vec.iter().map(|p| p.as_ref()).collect();
735
736        let mut stmt = conn
737            .prepare(&query)
738            .map_err(|e| internal_error(format!("Failed to prepare compartment query: {}", e)))?;
739
740        let rows: Vec<(String, Vec<u8>, String)> = stmt
741            .query_map(params_slice.as_slice(), |row| {
742                Ok((
743                    row.get::<_, String>(0)?,
744                    row.get::<_, Vec<u8>>(1)?,
745                    row.get::<_, String>(2)?,
746                ))
747            })
748            .map_err(|e| internal_error(format!("Failed to query compartment: {}", e)))?
749            .filter_map(|r| r.ok())
750            .collect();
751
752        let has_more = rows.len() > batch_size as usize;
753        let rows = if has_more {
754            &rows[..batch_size as usize]
755        } else {
756            &rows[..]
757        };
758
759        let mut lines = Vec::new();
760        let mut last_cursor = None;
761
762        for (id, data, last_updated) in rows {
763            let resource: Value = serde_json::from_slice(data)
764                .map_err(|e| internal_error(format!("Failed to parse resource: {}", e)))?;
765            let line = serde_json::to_string(&resource)
766                .map_err(|e| internal_error(format!("Failed to serialize resource: {}", e)))?;
767            lines.push(line);
768            last_cursor = Some(format!("{}|{}", last_updated, id));
769        }
770
771        Ok(NdjsonBatch {
772            lines,
773            next_cursor: if has_more { last_cursor } else { None },
774            is_last: !has_more,
775        })
776    }
777}
778
779#[async_trait]
780impl GroupExportProvider for SqliteBackend {
781    async fn get_group_members(
782        &self,
783        tenant: &TenantContext,
784        group_id: &str,
785    ) -> StorageResult<Vec<String>> {
786        let conn = self.get_connection()?;
787        let tenant_id = tenant.tenant_id().as_str();
788
789        // Get the Group resource
790        let data: Vec<u8> = conn
791            .query_row(
792                "SELECT data FROM resources WHERE tenant_id = ?1 AND resource_type = 'Group' AND id = ?2 AND is_deleted = 0",
793                params![tenant_id, group_id],
794                |row| row.get(0),
795            )
796            .map_err(|e| {
797                if matches!(e, rusqlite::Error::QueryReturnedNoRows) {
798                    StorageError::BulkExport(BulkExportError::GroupNotFound {
799                        group_id: group_id.to_string(),
800                    })
801                } else {
802                    internal_error(format!("Failed to get group: {}", e))
803                }
804            })?;
805
806        let group: Value = serde_json::from_slice(&data)
807            .map_err(|e| internal_error(format!("Failed to parse group: {}", e)))?;
808
809        // Extract member references from Group.member[].entity.reference
810        let mut members = Vec::new();
811        if let Some(member_array) = group.get("member").and_then(|m| m.as_array()) {
812            for member in member_array {
813                if let Some(entity) = member.get("entity") {
814                    if let Some(reference) = entity.get("reference").and_then(|r| r.as_str()) {
815                        members.push(reference.to_string());
816                    }
817                }
818            }
819        }
820
821        Ok(members)
822    }
823
824    async fn resolve_group_patient_ids(
825        &self,
826        tenant: &TenantContext,
827        group_id: &str,
828    ) -> StorageResult<Vec<String>> {
829        let members = self.get_group_members(tenant, group_id).await?;
830
831        // Filter to only Patient references and extract IDs
832        let patient_ids: Vec<String> = members
833            .into_iter()
834            .filter_map(|reference| {
835                if reference.starts_with("Patient/") {
836                    Some(reference.strip_prefix("Patient/").unwrap().to_string())
837                } else {
838                    None
839                }
840            })
841            .collect();
842
843        Ok(patient_ids)
844    }
845}
846
847#[cfg(test)]
848mod tests {
849    use super::*;
850    use crate::core::ResourceStorage;
851    use crate::tenant::{TenantId, TenantPermissions};
852    use helios_fhir::FhirVersion;
853    use serde_json::json;
854
855    fn create_test_backend() -> SqliteBackend {
856        let backend = SqliteBackend::in_memory().unwrap();
857        backend.init_schema().unwrap();
858        backend
859    }
860
861    fn create_test_tenant() -> TenantContext {
862        TenantContext::new(
863            TenantId::new("test-tenant"),
864            TenantPermissions::full_access(),
865        )
866    }
867
868    #[tokio::test]
869    async fn test_start_export() {
870        let backend = create_test_backend();
871        let tenant = create_test_tenant();
872
873        let request = ExportRequest::system().with_types(vec!["Patient".to_string()]);
874        let job_id = backend.start_export(&tenant, request).await.unwrap();
875
876        assert!(!job_id.as_str().is_empty());
877
878        let progress = backend.get_export_status(&tenant, &job_id).await.unwrap();
879        assert_eq!(progress.status, ExportStatus::Accepted);
880    }
881
882    #[tokio::test]
883    async fn test_cancel_export() {
884        let backend = create_test_backend();
885        let tenant = create_test_tenant();
886
887        let request = ExportRequest::system();
888        let job_id = backend.start_export(&tenant, request).await.unwrap();
889
890        backend.cancel_export(&tenant, &job_id).await.unwrap();
891
892        let progress = backend.get_export_status(&tenant, &job_id).await.unwrap();
893        assert_eq!(progress.status, ExportStatus::Cancelled);
894    }
895
896    #[tokio::test]
897    async fn test_list_exports() {
898        let backend = create_test_backend();
899        let tenant = create_test_tenant();
900
901        // Create two exports
902        let request1 = ExportRequest::system();
903        let _job_id1 = backend.start_export(&tenant, request1).await.unwrap();
904
905        let request2 = ExportRequest::patient();
906        let _job_id2 = backend.start_export(&tenant, request2).await.unwrap();
907
908        let exports = backend.list_exports(&tenant, false).await.unwrap();
909        assert_eq!(exports.len(), 2);
910    }
911
912    #[tokio::test]
913    async fn test_too_many_concurrent_exports() {
914        let backend = create_test_backend();
915        let tenant = create_test_tenant();
916
917        // Create 5 exports (the limit)
918        for _ in 0..5 {
919            let request = ExportRequest::system();
920            backend.start_export(&tenant, request).await.unwrap();
921        }
922
923        // Sixth should fail
924        let request = ExportRequest::system();
925        let result = backend.start_export(&tenant, request).await;
926        assert!(matches!(
927            result,
928            Err(StorageError::BulkExport(
929                BulkExportError::TooManyConcurrentExports { .. }
930            ))
931        ));
932    }
933
934    #[tokio::test]
935    async fn test_list_export_types() {
936        let backend = create_test_backend();
937        let tenant = create_test_tenant();
938
939        // Create some resources
940        backend
941            .create(
942                &tenant,
943                "Patient",
944                json!({"resourceType": "Patient", "name": [{"family": "Test"}]}),
945                FhirVersion::default(),
946            )
947            .await
948            .unwrap();
949
950        backend
951            .create(
952                &tenant,
953                "Observation",
954                json!({"resourceType": "Observation", "status": "final"}),
955                FhirVersion::default(),
956            )
957            .await
958            .unwrap();
959
960        let request = ExportRequest::system();
961        let types = backend.list_export_types(&tenant, &request).await.unwrap();
962
963        assert!(types.contains(&"Patient".to_string()));
964        assert!(types.contains(&"Observation".to_string()));
965    }
966
967    #[tokio::test]
968    async fn test_fetch_export_batch() {
969        let backend = create_test_backend();
970        let tenant = create_test_tenant();
971
972        // Create some resources
973        for i in 0..5 {
974            backend
975                .create(
976                    &tenant,
977                    "Patient",
978                    json!({"resourceType": "Patient", "name": [{"family": format!("Patient{}", i)}]}),
979                    FhirVersion::default(),
980                )
981                .await
982                .unwrap();
983        }
984
985        let request = ExportRequest::system();
986        let batch = backend
987            .fetch_export_batch(&tenant, &request, "Patient", None, 3)
988            .await
989            .unwrap();
990
991        assert_eq!(batch.lines.len(), 3);
992        assert!(!batch.is_last);
993        assert!(batch.next_cursor.is_some());
994
995        // Fetch next batch
996        let batch2 = backend
997            .fetch_export_batch(
998                &tenant,
999                &request,
1000                "Patient",
1001                batch.next_cursor.as_deref(),
1002                3,
1003            )
1004            .await
1005            .unwrap();
1006
1007        assert_eq!(batch2.lines.len(), 2);
1008        assert!(batch2.is_last);
1009    }
1010
1011    #[tokio::test]
1012    async fn test_delete_export() {
1013        let backend = create_test_backend();
1014        let tenant = create_test_tenant();
1015
1016        let request = ExportRequest::system();
1017        let job_id = backend.start_export(&tenant, request).await.unwrap();
1018
1019        backend.delete_export(&tenant, &job_id).await.unwrap();
1020
1021        // Should fail to get status now
1022        let result = backend.get_export_status(&tenant, &job_id).await;
1023        assert!(matches!(
1024            result,
1025            Err(StorageError::BulkExport(
1026                BulkExportError::JobNotFound { .. }
1027            ))
1028        ));
1029    }
1030}