Skip to main content

helios_persistence/backends/sqlite/
bulk_export.rs

1//! Bulk export implementation for SQLite backend.
2
3use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5use rusqlite::params;
6use serde_json::Value;
7use std::time::Duration as StdDuration;
8use tokio::sync::Mutex;
9
10use crate::core::bulk_export::{
11    BulkExportStorage, ExpiredExportRef, ExportDataProvider, ExportFileMetadata, ExportJobId,
12    ExportJobMetadata, ExportLevel, ExportProgress, ExportRequest, ExportStatus,
13    GroupExportProvider, NdjsonBatch, PatientExportProvider, RawExportManifest, RawManifestEntry,
14    StartExportInput, TypeExportProgress,
15};
16use crate::core::bulk_export_output::{ExportPartKey, FinalizedPart};
17use crate::core::bulk_export_worker::{
18    ExportClaimStrategy, ExportJobLease, ExportWorkerStorage, LeaseError, WorkerId, WorkerJobView,
19};
20use crate::error::{BackendError, BulkExportError, StorageError, StorageResult};
21use crate::tenant::{TenantContext, TenantId, TenantPermissions};
22
23use super::SqliteBackend;
24
25/// Process-local lock serializing `claim_next` for the single-instance
26/// SQLite job store (SQLite has no `SELECT … FOR UPDATE SKIP LOCKED`).
27static CLAIM_LOCK: Mutex<()> = Mutex::const_new(());
28
29/// Parses an RFC3339 timestamp column into a UTC `DateTime`.
30fn parse_dt(s: &str) -> StorageResult<DateTime<Utc>> {
31    DateTime::parse_from_rfc3339(s)
32        .map(|dt| dt.with_timezone(&Utc))
33        .map_err(|e| internal_error(format!("invalid timestamp '{s}': {e}")))
34}
35
36/// Parses an optional RFC3339 timestamp column.
37fn parse_dt_opt(s: Option<String>) -> Option<DateTime<Utc>> {
38    s.and_then(|s| {
39        DateTime::parse_from_rfc3339(&s)
40            .ok()
41            .map(|dt| dt.with_timezone(&Utc))
42    })
43}
44
45/// Splits a `{resource_type}-{part_index}` download segment.
46fn parse_part_segment(part: &str) -> Option<(String, u32)> {
47    let idx = part.rfind('-')?;
48    let resource_type = &part[..idx];
49    let part_index: u32 = part[idx + 1..].parse().ok()?;
50    if resource_type.is_empty() {
51        return None;
52    }
53    Some((resource_type.to_string(), part_index))
54}
55
56fn internal_error(message: String) -> StorageError {
57    StorageError::Backend(BackendError::Internal {
58        backend_name: "sqlite".to_string(),
59        message,
60        source: None,
61    })
62}
63
64#[async_trait]
65impl BulkExportStorage for SqliteBackend {
66    async fn start_export(
67        &self,
68        tenant: &TenantContext,
69        input: StartExportInput,
70    ) -> StorageResult<ExportJobId> {
71        let conn = self.get_connection()?;
72        let tenant_id = tenant.tenant_id().as_str();
73
74        let job_id = ExportJobId::new();
75        let now = Utc::now().to_rfc3339();
76        let transaction_time = input.transaction_time.to_rfc3339();
77
78        let level_str = match &input.request.level {
79            ExportLevel::System => "system".to_string(),
80            ExportLevel::Patient => "patient".to_string(),
81            ExportLevel::Group { .. } => "group".to_string(),
82        };
83
84        let group_id = input.request.group_id().map(|s| s.to_string());
85
86        let request_json = serde_json::to_string(&input.request)
87            .map_err(|e| internal_error(format!("Failed to serialize request: {}", e)))?;
88
89        conn.execute(
90            "INSERT INTO bulk_export_jobs
91             (id, tenant_id, status, level, group_id, request_json, transaction_time,
92              created_at, owner_subject, request_url, fhir_version, fencing_token)
93             VALUES (?1, ?2, 'accepted', ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, 0)",
94            params![
95                job_id.as_str(),
96                tenant_id,
97                level_str,
98                group_id,
99                request_json,
100                transaction_time,
101                now,
102                input.owner_subject,
103                input.request_url,
104                input.fhir_version.as_mime_param(),
105            ],
106        )
107        .map_err(|e| internal_error(format!("Failed to create export job: {}", e)))?;
108
109        Ok(job_id)
110    }
111
112    async fn get_export_status(
113        &self,
114        tenant: &TenantContext,
115        job_id: &ExportJobId,
116    ) -> StorageResult<ExportProgress> {
117        let conn = self.get_connection()?;
118        let tenant_id = tenant.tenant_id().as_str();
119
120        let (status_str, level_str, group_id, transaction_time, started_at, completed_at, error_message, current_type):
121            (String, String, Option<String>, String, Option<String>, Option<String>, Option<String>, Option<String>) = conn
122            .query_row(
123                "SELECT status, level, group_id, transaction_time, started_at, completed_at, error_message, current_type
124                 FROM bulk_export_jobs
125                 WHERE id = ?1 AND tenant_id = ?2",
126                params![job_id.as_str(), tenant_id],
127                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?,
128                          row.get(4)?, row.get(5)?, row.get(6)?, row.get(7)?)),
129            )
130            .map_err(|e| {
131                if matches!(e, rusqlite::Error::QueryReturnedNoRows) {
132                    StorageError::BulkExport(BulkExportError::JobNotFound {
133                        job_id: job_id.to_string(),
134                    })
135                } else {
136                    internal_error(format!("Failed to get export status: {}", e))
137                }
138            })?;
139
140        let status: ExportStatus = status_str
141            .parse()
142            .map_err(|_| internal_error(format!("Invalid status in database: {}", status_str)))?;
143
144        let level = match level_str.as_str() {
145            "system" => ExportLevel::System,
146            "patient" => ExportLevel::Patient,
147            "group" => ExportLevel::Group {
148                group_id: group_id.unwrap_or_default(),
149            },
150            _ => {
151                return Err(internal_error(format!(
152                    "Invalid level in database: {}",
153                    level_str
154                )));
155            }
156        };
157
158        let transaction_time = chrono::DateTime::parse_from_rfc3339(&transaction_time)
159            .map_err(|e| internal_error(format!("Invalid transaction_time: {}", e)))?
160            .with_timezone(&Utc);
161
162        let started_at = started_at.and_then(|s| {
163            chrono::DateTime::parse_from_rfc3339(&s)
164                .ok()
165                .map(|dt| dt.with_timezone(&Utc))
166        });
167
168        let completed_at = completed_at.and_then(|s| {
169            chrono::DateTime::parse_from_rfc3339(&s)
170                .ok()
171                .map(|dt| dt.with_timezone(&Utc))
172        });
173
174        // Get per-type progress
175        let mut stmt = conn
176            .prepare(
177                "SELECT resource_type, total_count, exported_count, error_count, cursor_state
178                 FROM bulk_export_progress
179                 WHERE job_id = ?1",
180            )
181            .map_err(|e| internal_error(format!("Failed to prepare progress query: {}", e)))?;
182
183        let type_progress: Vec<TypeExportProgress> = stmt
184            .query_map(params![job_id.as_str()], |row| {
185                Ok(TypeExportProgress {
186                    resource_type: row.get(0)?,
187                    total_count: row.get::<_, Option<i64>>(1)?.map(|v| v as u64),
188                    exported_count: row.get::<_, i64>(2)? as u64,
189                    error_count: row.get::<_, i64>(3)? as u64,
190                    cursor_state: row.get(4)?,
191                })
192            })
193            .map_err(|e| internal_error(format!("Failed to query progress: {}", e)))?
194            .filter_map(|r| r.ok())
195            .collect();
196
197        Ok(ExportProgress {
198            job_id: job_id.clone(),
199            status,
200            level,
201            transaction_time,
202            started_at,
203            completed_at,
204            type_progress,
205            current_type,
206            error_message,
207        })
208    }
209
210    async fn cancel_export(
211        &self,
212        tenant: &TenantContext,
213        job_id: &ExportJobId,
214    ) -> StorageResult<()> {
215        let conn = self.get_connection()?;
216        let tenant_id = tenant.tenant_id().as_str();
217
218        // Check current status
219        let current_status: String = conn
220            .query_row(
221                "SELECT status FROM bulk_export_jobs WHERE id = ?1 AND tenant_id = ?2",
222                params![job_id.as_str(), tenant_id],
223                |row| row.get(0),
224            )
225            .map_err(|e| {
226                if matches!(e, rusqlite::Error::QueryReturnedNoRows) {
227                    StorageError::BulkExport(BulkExportError::JobNotFound {
228                        job_id: job_id.to_string(),
229                    })
230                } else {
231                    internal_error(format!("Failed to get export status: {}", e))
232                }
233            })?;
234
235        let status: ExportStatus = current_status.parse().map_err(|_| {
236            internal_error(format!("Invalid status in database: {}", current_status))
237        })?;
238
239        if status.is_terminal() {
240            return Err(StorageError::BulkExport(BulkExportError::InvalidJobState {
241                job_id: job_id.to_string(),
242                expected: "accepted or in-progress".to_string(),
243                actual: current_status,
244            }));
245        }
246
247        let now = Utc::now().to_rfc3339();
248        conn.execute(
249            "UPDATE bulk_export_jobs SET status = 'cancelled', completed_at = ?1 WHERE id = ?2",
250            params![now, job_id.as_str()],
251        )
252        .map_err(|e| internal_error(format!("Failed to cancel export: {}", e)))?;
253
254        Ok(())
255    }
256
257    async fn delete_export(
258        &self,
259        tenant: &TenantContext,
260        job_id: &ExportJobId,
261    ) -> StorageResult<()> {
262        let conn = self.get_connection()?;
263        let tenant_id = tenant.tenant_id().as_str();
264
265        // Check exists
266        let exists: bool = conn
267            .query_row(
268                "SELECT 1 FROM bulk_export_jobs WHERE id = ?1 AND tenant_id = ?2",
269                params![job_id.as_str(), tenant_id],
270                |_| Ok(true),
271            )
272            .unwrap_or(false);
273
274        if !exists {
275            return Err(StorageError::BulkExport(BulkExportError::JobNotFound {
276                job_id: job_id.to_string(),
277            }));
278        }
279
280        // Delete job (cascades to progress and files due to foreign keys)
281        conn.execute(
282            "DELETE FROM bulk_export_jobs WHERE id = ?1 AND tenant_id = ?2",
283            params![job_id.as_str(), tenant_id],
284        )
285        .map_err(|e| internal_error(format!("Failed to delete export: {}", e)))?;
286
287        Ok(())
288    }
289
290    async fn get_export_manifest(
291        &self,
292        tenant: &TenantContext,
293        job_id: &ExportJobId,
294    ) -> StorageResult<RawExportManifest> {
295        let conn = self.get_connection()?;
296        let tenant_id = tenant.tenant_id().as_str();
297
298        let (status_str, transaction_time, request_url, error_message, completed_at): (
299            String,
300            String,
301            String,
302            Option<String>,
303            Option<String>,
304        ) = conn
305            .query_row(
306                "SELECT status, transaction_time, request_url, error_message, completed_at
307                 FROM bulk_export_jobs WHERE id = ?1 AND tenant_id = ?2",
308                params![job_id.as_str(), tenant_id],
309                |row| {
310                    Ok((
311                        row.get(0)?,
312                        row.get(1)?,
313                        row.get(2)?,
314                        row.get(3)?,
315                        row.get(4)?,
316                    ))
317                },
318            )
319            .map_err(|e| {
320                if matches!(e, rusqlite::Error::QueryReturnedNoRows) {
321                    StorageError::BulkExport(BulkExportError::JobNotFound {
322                        job_id: job_id.to_string(),
323                    })
324                } else {
325                    internal_error(format!("Failed to get export job: {}", e))
326                }
327            })?;
328
329        let status: ExportStatus = status_str
330            .parse()
331            .map_err(|_| internal_error(format!("Invalid status in database: {}", status_str)))?;
332
333        // Get output/error files.
334        let mut stmt = conn
335            .prepare(
336                "SELECT resource_type, resource_count, file_type, part_index, fencing_token
337                 FROM bulk_export_files
338                 WHERE job_id = ?1
339                 ORDER BY file_type, resource_type, part_index",
340            )
341            .map_err(|e| internal_error(format!("Failed to prepare files query: {}", e)))?;
342
343        let rows: Vec<(String, i64, String, i64, i64)> = stmt
344            .query_map(params![job_id.as_str()], |row| {
345                Ok((
346                    row.get(0)?,
347                    row.get::<_, Option<i64>>(1)?.unwrap_or(0),
348                    row.get(2)?,
349                    row.get(3)?,
350                    row.get(4)?,
351                ))
352            })
353            .map_err(|e| internal_error(format!("Failed to query files: {}", e)))?
354            .filter_map(|r| r.ok())
355            .collect();
356
357        let mut output = Vec::new();
358        let mut errors = Vec::new();
359        for (resource_type, count, file_type, part_index, fencing_token) in rows {
360            let key = ExportPartKey {
361                tenant_id: tenant_id.to_string(),
362                job_id: job_id.clone(),
363                resource_type: resource_type.clone(),
364                file_type: file_type.clone(),
365                part_index: part_index as u32,
366                fencing_token: fencing_token as u64,
367            };
368            let entry = RawManifestEntry {
369                resource_type,
370                key,
371                count: count as u64,
372            };
373            if file_type == "error" {
374                errors.push(entry);
375            } else {
376                output.push(entry);
377            }
378        }
379
380        Ok(RawExportManifest {
381            transaction_time: parse_dt(&transaction_time)?,
382            request_url,
383            status,
384            error_message,
385            completed_at: parse_dt_opt(completed_at),
386            output,
387            errors,
388        })
389    }
390
391    async fn list_exports(
392        &self,
393        tenant: &TenantContext,
394        include_completed: bool,
395    ) -> StorageResult<Vec<ExportProgress>> {
396        // Collect IDs first, then drop the connection before calling async methods
397        let job_ids: Vec<String> = {
398            let conn = self.get_connection()?;
399            let tenant_id = tenant.tenant_id().as_str();
400
401            let query = if include_completed {
402                "SELECT id FROM bulk_export_jobs WHERE tenant_id = ?1 ORDER BY created_at DESC"
403            } else {
404                "SELECT id FROM bulk_export_jobs WHERE tenant_id = ?1 AND status IN ('accepted', 'in-progress') ORDER BY created_at DESC"
405            };
406
407            let mut stmt = conn
408                .prepare(query)
409                .map_err(|e| internal_error(format!("Failed to prepare list query: {}", e)))?;
410
411            stmt.query_map(params![tenant_id], |row| row.get(0))
412                .map_err(|e| internal_error(format!("Failed to query exports: {}", e)))?
413                .filter_map(|r| r.ok())
414                .collect()
415        };
416
417        let mut results = Vec::new();
418        for id in job_ids {
419            let job_id = ExportJobId::from_string(id);
420            if let Ok(progress) = self.get_export_status(tenant, &job_id).await {
421                results.push(progress);
422            }
423        }
424
425        Ok(results)
426    }
427
428    async fn get_export_job_metadata(
429        &self,
430        tenant: &TenantContext,
431        job_id: &ExportJobId,
432    ) -> StorageResult<ExportJobMetadata> {
433        let conn = self.get_connection()?;
434        let tenant_id = tenant.tenant_id().as_str();
435
436        let (status_str, level_str, group_id, owner_subject, transaction_time, completed_at, request_url): (
437            String,
438            String,
439            Option<String>,
440            Option<String>,
441            String,
442            Option<String>,
443            String,
444        ) = conn
445            .query_row(
446                "SELECT status, level, group_id, owner_subject, transaction_time, completed_at, request_url
447                 FROM bulk_export_jobs WHERE id = ?1 AND tenant_id = ?2",
448                params![job_id.as_str(), tenant_id],
449                |row| {
450                    Ok((
451                        row.get(0)?,
452                        row.get(1)?,
453                        row.get(2)?,
454                        row.get(3)?,
455                        row.get(4)?,
456                        row.get(5)?,
457                        row.get(6)?,
458                    ))
459                },
460            )
461            .map_err(|e| {
462                if matches!(e, rusqlite::Error::QueryReturnedNoRows) {
463                    StorageError::BulkExport(BulkExportError::JobNotFound {
464                        job_id: job_id.to_string(),
465                    })
466                } else {
467                    internal_error(format!("Failed to get export job metadata: {}", e))
468                }
469            })?;
470
471        let status: ExportStatus = status_str
472            .parse()
473            .map_err(|_| internal_error(format!("Invalid status in database: {}", status_str)))?;
474        let level = match level_str.as_str() {
475            "system" => ExportLevel::System,
476            "patient" => ExportLevel::Patient,
477            "group" => ExportLevel::Group {
478                group_id: group_id.unwrap_or_default(),
479            },
480            _ => return Err(internal_error(format!("Invalid level: {}", level_str))),
481        };
482
483        Ok(ExportJobMetadata {
484            job_id: job_id.clone(),
485            status,
486            level,
487            owner_subject,
488            transaction_time: parse_dt(&transaction_time)?,
489            completed_at: parse_dt_opt(completed_at),
490            request_url,
491        })
492    }
493
494    async fn get_export_file_metadata(
495        &self,
496        tenant: &TenantContext,
497        job_id: &ExportJobId,
498        part: &str,
499    ) -> StorageResult<ExportFileMetadata> {
500        let (resource_type, part_index) = parse_part_segment(part).ok_or_else(|| {
501            StorageError::BulkExport(BulkExportError::JobNotFound {
502                job_id: format!("{job_id}/{part}"),
503            })
504        })?;
505
506        let conn = self.get_connection()?;
507        let tenant_id = tenant.tenant_id().as_str();
508
509        let (file_type, resource_count, fencing_token, owner_subject): (
510            String,
511            i64,
512            i64,
513            Option<String>,
514        ) = conn
515            .query_row(
516                "SELECT f.file_type, f.resource_count, f.fencing_token, j.owner_subject
517                 FROM bulk_export_files f
518                 JOIN bulk_export_jobs j ON j.id = f.job_id
519                 WHERE f.job_id = ?1 AND j.tenant_id = ?2
520                   AND f.resource_type = ?3 AND f.part_index = ?4",
521                params![job_id.as_str(), tenant_id, resource_type, part_index as i64],
522                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
523            )
524            .map_err(|e| {
525                if matches!(e, rusqlite::Error::QueryReturnedNoRows) {
526                    StorageError::BulkExport(BulkExportError::JobNotFound {
527                        job_id: format!("{job_id}/{part}"),
528                    })
529                } else {
530                    internal_error(format!("Failed to get export file metadata: {}", e))
531                }
532            })?;
533
534        let key = ExportPartKey {
535            tenant_id: tenant_id.to_string(),
536            job_id: job_id.clone(),
537            resource_type: resource_type.clone(),
538            file_type: file_type.clone(),
539            part_index,
540            fencing_token: fencing_token as u64,
541        };
542
543        Ok(ExportFileMetadata {
544            key,
545            resource_type,
546            file_type,
547            line_count: resource_count as u64,
548            job_owner_subject: owner_subject,
549        })
550    }
551
552    async fn count_active_exports(&self, tenant: &TenantContext) -> StorageResult<u64> {
553        let conn = self.get_connection()?;
554        let tenant_id = tenant.tenant_id().as_str();
555        let count: i64 = conn
556            .query_row(
557                "SELECT COUNT(*) FROM bulk_export_jobs
558                 WHERE tenant_id = ?1 AND status IN ('accepted', 'in-progress')",
559                params![tenant_id],
560                |row| row.get(0),
561            )
562            .map_err(|e| internal_error(format!("Failed to count active exports: {}", e)))?;
563        Ok(count as u64)
564    }
565
566    async fn list_expired_exports(
567        &self,
568        now: DateTime<Utc>,
569        output_ttl: StdDuration,
570        limit: u32,
571    ) -> StorageResult<Vec<ExpiredExportRef>> {
572        let conn = self.get_connection()?;
573        let cutoff = (now
574            - chrono::Duration::from_std(output_ttl)
575                .unwrap_or_else(|_| chrono::Duration::seconds(0)))
576        .to_rfc3339();
577
578        let mut stmt = conn
579            .prepare(
580                "SELECT tenant_id, id FROM bulk_export_jobs
581                 WHERE status IN ('complete', 'error', 'cancelled')
582                   AND completed_at IS NOT NULL AND completed_at < ?1
583                 ORDER BY completed_at LIMIT ?2",
584            )
585            .map_err(|e| internal_error(format!("Failed to prepare expired query: {}", e)))?;
586
587        let rows: Vec<(String, String)> = stmt
588            .query_map(params![cutoff, limit], |row| Ok((row.get(0)?, row.get(1)?)))
589            .map_err(|e| internal_error(format!("Failed to query expired exports: {}", e)))?
590            .filter_map(|r| r.ok())
591            .collect();
592
593        Ok(rows
594            .into_iter()
595            .map(|(tenant_id, id)| ExpiredExportRef {
596                tenant: TenantContext::new(
597                    TenantId::new(tenant_id),
598                    TenantPermissions::full_access(),
599                ),
600                job_id: ExportJobId::from_string(id),
601            })
602            .collect())
603    }
604}
605
606/// Encodes an [`ExportPartKey`] into the `file_path` column.
607fn encode_part_path(key: &ExportPartKey) -> String {
608    format!(
609        "{}/{}/{}/{}-{}-{}",
610        key.tenant_id,
611        key.job_id,
612        key.file_type,
613        key.resource_type,
614        key.part_index,
615        key.fencing_token
616    )
617}
618
619#[async_trait]
620impl ExportClaimStrategy for SqliteBackend {
621    async fn claim_next(
622        &self,
623        worker_id: &WorkerId,
624        lease_duration: StdDuration,
625    ) -> StorageResult<Option<ExportJobLease>> {
626        let _guard = CLAIM_LOCK.lock().await;
627        let conn = self.get_connection()?;
628        let now = Utc::now();
629        let now_str = now.to_rfc3339();
630        let lease_expiry = now
631            + chrono::Duration::from_std(lease_duration)
632                .unwrap_or_else(|_| chrono::Duration::seconds(60));
633        let lease_expiry_str = lease_expiry.to_rfc3339();
634
635        // Find one eligible job: accepted, or in-progress with an expired lease.
636        let row: Option<(String, String, i64)> = conn
637            .query_row(
638                "SELECT id, tenant_id, fencing_token FROM bulk_export_jobs
639                 WHERE status = 'accepted'
640                    OR (status = 'in-progress' AND (lease_expiry IS NULL OR lease_expiry < ?1))
641                 ORDER BY created_at LIMIT 1",
642                params![now_str],
643                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
644            )
645            .ok();
646
647        let Some((job_id, tenant_id, fencing_token)) = row else {
648            return Ok(None);
649        };
650        let new_token = fencing_token + 1;
651
652        conn.execute(
653            "UPDATE bulk_export_jobs
654             SET status = 'in-progress', worker_id = ?1, lease_expiry = ?2,
655                 heartbeat_at = ?3, fencing_token = ?4,
656                 started_at = COALESCE(started_at, ?3)
657             WHERE id = ?5",
658            params![
659                worker_id.as_str(),
660                lease_expiry_str,
661                now_str,
662                new_token,
663                job_id
664            ],
665        )
666        .map_err(|e| internal_error(format!("Failed to claim export job: {}", e)))?;
667
668        Ok(Some(ExportJobLease {
669            job_id: ExportJobId::from_string(job_id),
670            tenant: TenantContext::new(TenantId::new(tenant_id), TenantPermissions::full_access()),
671            worker_id: worker_id.clone(),
672            lease_expiry,
673            fencing_token: new_token as u64,
674        }))
675    }
676
677    async fn heartbeat(&self, lease: &ExportJobLease) -> Result<DateTime<Utc>, LeaseError> {
678        let conn = self.get_connection().map_err(LeaseError::Storage)?;
679        let now = Utc::now();
680        let new_expiry = now + chrono::Duration::seconds(60);
681        let affected = conn
682            .execute(
683                "UPDATE bulk_export_jobs
684                 SET lease_expiry = ?1, heartbeat_at = ?2
685                 WHERE id = ?3 AND worker_id = ?4 AND fencing_token = ?5",
686                params![
687                    new_expiry.to_rfc3339(),
688                    now.to_rfc3339(),
689                    lease.job_id.as_str(),
690                    lease.worker_id.as_str(),
691                    lease.fencing_token as i64
692                ],
693            )
694            .map_err(|e| LeaseError::Storage(internal_error(format!("heartbeat failed: {e}"))))?;
695        if affected == 0 {
696            Err(LeaseError::LeaseLost {
697                job_id: lease.job_id.clone(),
698            })
699        } else {
700            Ok(new_expiry)
701        }
702    }
703
704    async fn release(&self, lease: ExportJobLease) -> StorageResult<()> {
705        let conn = self.get_connection()?;
706        conn.execute(
707            "UPDATE bulk_export_jobs
708             SET status = 'accepted', worker_id = NULL, lease_expiry = NULL
709             WHERE id = ?1 AND worker_id = ?2 AND fencing_token = ?3
710               AND status = 'in-progress'",
711            params![
712                lease.job_id.as_str(),
713                lease.worker_id.as_str(),
714                lease.fencing_token as i64
715            ],
716        )
717        .map_err(|e| internal_error(format!("Failed to release lease: {}", e)))?;
718        Ok(())
719    }
720}
721
722#[async_trait]
723impl ExportWorkerStorage for SqliteBackend {
724    async fn get_export_job_for_worker(
725        &self,
726        tenant: &TenantContext,
727        job_id: &ExportJobId,
728        worker_id: &WorkerId,
729        fencing_token: u64,
730    ) -> Result<WorkerJobView, LeaseError> {
731        let conn = self.get_connection().map_err(LeaseError::Storage)?;
732        let tenant_id = tenant.tenant_id().as_str();
733
734        let (request_json, level_str, group_id, transaction_time, fhir_version): (
735            String,
736            String,
737            Option<String>,
738            String,
739            String,
740        ) = conn
741            .query_row(
742                "SELECT request_json, level, group_id, transaction_time, fhir_version
743                 FROM bulk_export_jobs
744                 WHERE id = ?1 AND tenant_id = ?2 AND worker_id = ?3 AND fencing_token = ?4",
745                params![
746                    job_id.as_str(),
747                    tenant_id,
748                    worker_id.as_str(),
749                    fencing_token as i64
750                ],
751                |row| {
752                    Ok((
753                        row.get(0)?,
754                        row.get(1)?,
755                        row.get(2)?,
756                        row.get(3)?,
757                        row.get(4)?,
758                    ))
759                },
760            )
761            .map_err(|e| match e {
762                rusqlite::Error::QueryReturnedNoRows => LeaseError::LeaseLost {
763                    job_id: job_id.clone(),
764                },
765                other => LeaseError::Storage(internal_error(format!(
766                    "Failed to load worker job: {other}"
767                ))),
768            })?;
769
770        let request: ExportRequest = serde_json::from_str(&request_json).map_err(|e| {
771            LeaseError::Storage(internal_error(format!("Failed to parse request_json: {e}")))
772        })?;
773        let level = match level_str.as_str() {
774            "system" => ExportLevel::System,
775            "patient" => ExportLevel::Patient,
776            "group" => ExportLevel::Group {
777                group_id: group_id.unwrap_or_default(),
778            },
779            _ => {
780                return Err(LeaseError::Storage(internal_error(format!(
781                    "Invalid level: {level_str}"
782                ))));
783            }
784        };
785        let fhir_version = helios_fhir::FhirVersion::from_mime_param(&fhir_version)
786            .unwrap_or_else(helios_fhir::FhirVersion::default_enabled);
787        let transaction_time = parse_dt(&transaction_time).map_err(LeaseError::Storage)?;
788
789        // Load persisted per-type progress for resume.
790        let mut stmt = conn
791            .prepare(
792                "SELECT resource_type, total_count, exported_count, error_count, cursor_state
793                 FROM bulk_export_progress WHERE job_id = ?1",
794            )
795            .map_err(|e| LeaseError::Storage(internal_error(format!("prepare progress: {e}"))))?;
796        let type_progress: Vec<TypeExportProgress> = stmt
797            .query_map(params![job_id.as_str()], |row| {
798                Ok(TypeExportProgress {
799                    resource_type: row.get(0)?,
800                    total_count: row.get::<_, Option<i64>>(1)?.map(|v| v as u64),
801                    exported_count: row.get::<_, i64>(2)? as u64,
802                    error_count: row.get::<_, i64>(3)? as u64,
803                    cursor_state: row.get(4)?,
804                })
805            })
806            .map_err(|e| LeaseError::Storage(internal_error(format!("query progress: {e}"))))?
807            .filter_map(|r| r.ok())
808            .collect();
809
810        Ok(WorkerJobView {
811            request,
812            level,
813            transaction_time,
814            fhir_version,
815            type_progress,
816        })
817    }
818
819    async fn mark_export_in_progress(
820        &self,
821        tenant: &TenantContext,
822        job_id: &ExportJobId,
823        worker_id: &WorkerId,
824        fencing_token: u64,
825    ) -> Result<(), LeaseError> {
826        let conn = self.get_connection().map_err(LeaseError::Storage)?;
827        let now = Utc::now().to_rfc3339();
828        let affected = conn
829            .execute(
830                "UPDATE bulk_export_jobs
831                 SET status = 'in-progress', started_at = COALESCE(started_at, ?1)
832                 WHERE id = ?2 AND tenant_id = ?3 AND worker_id = ?4 AND fencing_token = ?5",
833                params![
834                    now,
835                    job_id.as_str(),
836                    tenant.tenant_id().as_str(),
837                    worker_id.as_str(),
838                    fencing_token as i64
839                ],
840            )
841            .map_err(|e| LeaseError::Storage(internal_error(format!("mark_in_progress: {e}"))))?;
842        if affected == 0 {
843            Err(LeaseError::LeaseLost {
844                job_id: job_id.clone(),
845            })
846        } else {
847            Ok(())
848        }
849    }
850
851    async fn update_export_type_progress(
852        &self,
853        tenant: &TenantContext,
854        job_id: &ExportJobId,
855        worker_id: &WorkerId,
856        fencing_token: u64,
857        progress: &TypeExportProgress,
858    ) -> Result<(), LeaseError> {
859        let conn = self.get_connection().map_err(LeaseError::Storage)?;
860        let affected = conn
861            .execute(
862                "INSERT INTO bulk_export_progress
863                   (job_id, resource_type, total_count, exported_count, error_count, cursor_state)
864                 SELECT ?1, ?2, ?3, ?4, ?5, ?6
865                 WHERE EXISTS (
866                     SELECT 1 FROM bulk_export_jobs
867                     WHERE id = ?1 AND tenant_id = ?7 AND worker_id = ?8 AND fencing_token = ?9
868                 )
869                 ON CONFLICT(job_id, resource_type) DO UPDATE SET
870                   total_count = excluded.total_count,
871                   exported_count = excluded.exported_count,
872                   error_count = excluded.error_count,
873                   cursor_state = excluded.cursor_state",
874                params![
875                    job_id.as_str(),
876                    progress.resource_type,
877                    progress.total_count.map(|v| v as i64),
878                    progress.exported_count as i64,
879                    progress.error_count as i64,
880                    progress.cursor_state,
881                    tenant.tenant_id().as_str(),
882                    worker_id.as_str(),
883                    fencing_token as i64,
884                ],
885            )
886            .map_err(|e| {
887                LeaseError::Storage(internal_error(format!("update_type_progress: {e}")))
888            })?;
889        if affected == 0 {
890            Err(LeaseError::LeaseLost {
891                job_id: job_id.clone(),
892            })
893        } else {
894            Ok(())
895        }
896    }
897
898    async fn record_export_file(
899        &self,
900        tenant: &TenantContext,
901        job_id: &ExportJobId,
902        worker_id: &WorkerId,
903        fencing_token: u64,
904        part: &FinalizedPart,
905        file_type: &str,
906    ) -> Result<(), LeaseError> {
907        let conn = self.get_connection().map_err(LeaseError::Storage)?;
908        let file_path = encode_part_path(&part.key);
909        let affected = conn
910            .execute(
911                "INSERT INTO bulk_export_files
912                   (job_id, resource_type, file_type, file_path, resource_count, byte_count,
913                    part_index, fencing_token)
914                 SELECT ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8
915                 WHERE EXISTS (
916                     SELECT 1 FROM bulk_export_jobs
917                     WHERE id = ?1 AND tenant_id = ?9 AND worker_id = ?10 AND fencing_token = ?11
918                 )
919                 ON CONFLICT(job_id, file_type, resource_type, part_index) DO UPDATE SET
920                   file_path = excluded.file_path,
921                   resource_count = excluded.resource_count,
922                   byte_count = excluded.byte_count,
923                   fencing_token = excluded.fencing_token",
924                params![
925                    job_id.as_str(),
926                    part.resource_type,
927                    file_type,
928                    file_path,
929                    part.line_count as i64,
930                    part.size_bytes as i64,
931                    part.key.part_index as i64,
932                    part.key.fencing_token as i64,
933                    tenant.tenant_id().as_str(),
934                    worker_id.as_str(),
935                    fencing_token as i64,
936                ],
937            )
938            .map_err(|e| LeaseError::Storage(internal_error(format!("record_export_file: {e}"))))?;
939        if affected == 0 {
940            Err(LeaseError::LeaseLost {
941                job_id: job_id.clone(),
942            })
943        } else {
944            Ok(())
945        }
946    }
947
948    async fn finish_export_job(
949        &self,
950        tenant: &TenantContext,
951        job_id: &ExportJobId,
952        worker_id: &WorkerId,
953        fencing_token: u64,
954    ) -> Result<(), LeaseError> {
955        let conn = self.get_connection().map_err(LeaseError::Storage)?;
956        let now = Utc::now().to_rfc3339();
957        let affected = conn
958            .execute(
959                "UPDATE bulk_export_jobs
960                 SET status = 'complete', completed_at = ?1
961                 WHERE id = ?2 AND tenant_id = ?3 AND worker_id = ?4 AND fencing_token = ?5",
962                params![
963                    now,
964                    job_id.as_str(),
965                    tenant.tenant_id().as_str(),
966                    worker_id.as_str(),
967                    fencing_token as i64
968                ],
969            )
970            .map_err(|e| LeaseError::Storage(internal_error(format!("finish_job: {e}"))))?;
971        if affected == 0 {
972            Err(LeaseError::LeaseLost {
973                job_id: job_id.clone(),
974            })
975        } else {
976            Ok(())
977        }
978    }
979
980    async fn fail_export_job(
981        &self,
982        tenant: &TenantContext,
983        job_id: &ExportJobId,
984        worker_id: &WorkerId,
985        fencing_token: u64,
986        error_message: &str,
987    ) -> Result<(), LeaseError> {
988        let conn = self.get_connection().map_err(LeaseError::Storage)?;
989        let now = Utc::now().to_rfc3339();
990        let affected = conn
991            .execute(
992                "UPDATE bulk_export_jobs
993                 SET status = 'error', error_message = ?1, completed_at = ?2
994                 WHERE id = ?3 AND tenant_id = ?4 AND worker_id = ?5 AND fencing_token = ?6",
995                params![
996                    error_message,
997                    now,
998                    job_id.as_str(),
999                    tenant.tenant_id().as_str(),
1000                    worker_id.as_str(),
1001                    fencing_token as i64
1002                ],
1003            )
1004            .map_err(|e| LeaseError::Storage(internal_error(format!("fail_job: {e}"))))?;
1005        if affected == 0 {
1006            Err(LeaseError::LeaseLost {
1007                job_id: job_id.clone(),
1008            })
1009        } else {
1010            Ok(())
1011        }
1012    }
1013}
1014
1015#[async_trait]
1016impl ExportDataProvider for SqliteBackend {
1017    async fn list_export_types(
1018        &self,
1019        tenant: &TenantContext,
1020        request: &ExportRequest,
1021    ) -> StorageResult<Vec<String>> {
1022        let conn = self.get_connection()?;
1023        let tenant_id = tenant.tenant_id().as_str();
1024
1025        // If specific types are requested, validate and return them
1026        if !request.resource_types.is_empty() {
1027            // Verify the types exist in the database
1028            let mut valid_types = Vec::new();
1029            for rt in &request.resource_types {
1030                let exists: bool = conn
1031                    .query_row(
1032                        "SELECT 1 FROM resources WHERE tenant_id = ?1 AND resource_type = ?2 AND is_deleted = 0 LIMIT 1",
1033                        params![tenant_id, rt],
1034                        |_| Ok(true),
1035                    )
1036                    .unwrap_or(false);
1037                if exists {
1038                    valid_types.push(rt.clone());
1039                }
1040            }
1041            return Ok(valid_types);
1042        }
1043
1044        // Otherwise, get all types with data
1045        let mut stmt = conn
1046            .prepare(
1047                "SELECT DISTINCT resource_type FROM resources
1048                 WHERE tenant_id = ?1 AND is_deleted = 0
1049                 ORDER BY resource_type",
1050            )
1051            .map_err(|e| internal_error(format!("Failed to prepare types query: {}", e)))?;
1052
1053        let types: Vec<String> = stmt
1054            .query_map(params![tenant_id], |row| row.get(0))
1055            .map_err(|e| internal_error(format!("Failed to query types: {}", e)))?
1056            .filter_map(|r| r.ok())
1057            .collect();
1058
1059        Ok(types)
1060    }
1061
1062    async fn count_export_resources(
1063        &self,
1064        tenant: &TenantContext,
1065        request: &ExportRequest,
1066        resource_type: &str,
1067    ) -> StorageResult<u64> {
1068        let conn = self.get_connection()?;
1069        let tenant_id = tenant.tenant_id().as_str();
1070
1071        let mut query = "SELECT COUNT(*) FROM resources WHERE tenant_id = ?1 AND resource_type = ?2 AND is_deleted = 0".to_string();
1072        let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = vec![
1073            Box::new(tenant_id.to_string()),
1074            Box::new(resource_type.to_string()),
1075        ];
1076
1077        // Apply _since filter if present
1078        if let Some(since) = request.since {
1079            query.push_str(" AND last_updated >= ?3");
1080            params_vec.push(Box::new(since.to_rfc3339()));
1081        }
1082
1083        let params_slice: Vec<&dyn rusqlite::ToSql> =
1084            params_vec.iter().map(|p| p.as_ref()).collect();
1085
1086        let count: i64 = conn
1087            .query_row(&query, params_slice.as_slice(), |row| row.get(0))
1088            .map_err(|e| internal_error(format!("Failed to count resources: {}", e)))?;
1089
1090        Ok(count as u64)
1091    }
1092
1093    async fn fetch_export_batch(
1094        &self,
1095        tenant: &TenantContext,
1096        request: &ExportRequest,
1097        resource_type: &str,
1098        cursor: Option<&str>,
1099        batch_size: u32,
1100    ) -> StorageResult<NdjsonBatch> {
1101        let conn = self.get_connection()?;
1102        let tenant_id = tenant.tenant_id().as_str();
1103
1104        let mut query = "SELECT id, data, last_updated FROM resources WHERE tenant_id = ?1 AND resource_type = ?2 AND is_deleted = 0".to_string();
1105        let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = vec![
1106            Box::new(tenant_id.to_string()),
1107            Box::new(resource_type.to_string()),
1108        ];
1109
1110        // Apply _since filter if present
1111        if let Some(since) = request.since {
1112            query.push_str(" AND last_updated >= ?");
1113            params_vec.push(Box::new(since.to_rfc3339()));
1114        }
1115
1116        // Apply cursor (keyset pagination)
1117        if let Some(cursor) = cursor {
1118            // Cursor format: "last_updated|id"
1119            let parts: Vec<&str> = cursor.splitn(2, '|').collect();
1120            if parts.len() == 2 {
1121                query.push_str(" AND (last_updated, id) > (?, ?)");
1122                params_vec.push(Box::new(parts[0].to_string()));
1123                params_vec.push(Box::new(parts[1].to_string()));
1124            }
1125        }
1126
1127        query.push_str(" ORDER BY last_updated, id");
1128        query.push_str(&format!(" LIMIT {}", batch_size + 1)); // Fetch one extra to detect if there's more
1129
1130        let params_slice: Vec<&dyn rusqlite::ToSql> =
1131            params_vec.iter().map(|p| p.as_ref()).collect();
1132
1133        let mut stmt = conn
1134            .prepare(&query)
1135            .map_err(|e| internal_error(format!("Failed to prepare batch query: {}", e)))?;
1136
1137        let rows: Vec<(String, Vec<u8>, String)> = stmt
1138            .query_map(params_slice.as_slice(), |row| {
1139                Ok((
1140                    row.get::<_, String>(0)?,
1141                    row.get::<_, Vec<u8>>(1)?,
1142                    row.get::<_, String>(2)?,
1143                ))
1144            })
1145            .map_err(|e| internal_error(format!("Failed to query batch: {}", e)))?
1146            .filter_map(|r| r.ok())
1147            .collect();
1148
1149        let has_more = rows.len() > batch_size as usize;
1150        let rows = if has_more {
1151            &rows[..batch_size as usize]
1152        } else {
1153            &rows[..]
1154        };
1155
1156        let mut lines = Vec::new();
1157        let mut last_cursor = None;
1158
1159        for (id, data, last_updated) in rows {
1160            let resource: Value = serde_json::from_slice(data)
1161                .map_err(|e| internal_error(format!("Failed to parse resource: {}", e)))?;
1162            let line = serde_json::to_string(&resource)
1163                .map_err(|e| internal_error(format!("Failed to serialize resource: {}", e)))?;
1164            lines.push(line);
1165            last_cursor = Some(format!("{}|{}", last_updated, id));
1166        }
1167
1168        Ok(NdjsonBatch {
1169            lines,
1170            next_cursor: if has_more { last_cursor } else { None },
1171            is_last: !has_more,
1172        })
1173    }
1174}
1175
1176#[async_trait]
1177impl PatientExportProvider for SqliteBackend {
1178    async fn list_patient_ids(
1179        &self,
1180        tenant: &TenantContext,
1181        request: &ExportRequest,
1182        cursor: Option<&str>,
1183        batch_size: u32,
1184    ) -> StorageResult<(Vec<String>, Option<String>)> {
1185        let conn = self.get_connection()?;
1186        let tenant_id = tenant.tenant_id().as_str();
1187
1188        let mut query = "SELECT id FROM resources WHERE tenant_id = ?1 AND resource_type = 'Patient' AND is_deleted = 0".to_string();
1189        let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(tenant_id.to_string())];
1190
1191        if let Some(since) = request.since {
1192            query.push_str(" AND last_updated >= ?");
1193            params_vec.push(Box::new(since.to_rfc3339()));
1194        }
1195
1196        if let Some(cursor) = cursor {
1197            query.push_str(" AND id > ?");
1198            params_vec.push(Box::new(cursor.to_string()));
1199        }
1200
1201        query.push_str(" ORDER BY id");
1202        query.push_str(&format!(" LIMIT {}", batch_size + 1));
1203
1204        let params_slice: Vec<&dyn rusqlite::ToSql> =
1205            params_vec.iter().map(|p| p.as_ref()).collect();
1206
1207        let mut stmt = conn
1208            .prepare(&query)
1209            .map_err(|e| internal_error(format!("Failed to prepare patient ids query: {}", e)))?;
1210
1211        let ids: Vec<String> = stmt
1212            .query_map(params_slice.as_slice(), |row| row.get(0))
1213            .map_err(|e| internal_error(format!("Failed to query patient ids: {}", e)))?
1214            .filter_map(|r| r.ok())
1215            .collect();
1216
1217        let has_more = ids.len() > batch_size as usize;
1218        let ids = if has_more {
1219            ids[..batch_size as usize].to_vec()
1220        } else {
1221            ids
1222        };
1223
1224        let next_cursor = if has_more { ids.last().cloned() } else { None };
1225
1226        Ok((ids, next_cursor))
1227    }
1228
1229    async fn fetch_patient_compartment_batch(
1230        &self,
1231        tenant: &TenantContext,
1232        request: &ExportRequest,
1233        resource_type: &str,
1234        patient_ids: &[String],
1235        cursor: Option<&str>,
1236        batch_size: u32,
1237    ) -> StorageResult<NdjsonBatch> {
1238        if patient_ids.is_empty() {
1239            return Ok(NdjsonBatch::empty());
1240        }
1241
1242        let conn = self.get_connection()?;
1243        let tenant_id = tenant.tenant_id().as_str();
1244
1245        // For Patient resources, just filter by the IDs
1246        if resource_type == "Patient" {
1247            let placeholders: Vec<String> = (0..patient_ids.len())
1248                .map(|i| format!("?{}", i + 3))
1249                .collect();
1250            let mut query = format!(
1251                "SELECT id, data, last_updated FROM resources
1252                 WHERE tenant_id = ?1 AND resource_type = ?2 AND id IN ({}) AND is_deleted = 0",
1253                placeholders.join(",")
1254            );
1255
1256            let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = vec![
1257                Box::new(tenant_id.to_string()),
1258                Box::new(resource_type.to_string()),
1259            ];
1260            for id in patient_ids {
1261                params_vec.push(Box::new(id.clone()));
1262            }
1263
1264            if let Some(cursor) = cursor {
1265                let parts: Vec<&str> = cursor.splitn(2, '|').collect();
1266                if parts.len() == 2 {
1267                    query.push_str(" AND (last_updated, id) > (?, ?)");
1268                    params_vec.push(Box::new(parts[0].to_string()));
1269                    params_vec.push(Box::new(parts[1].to_string()));
1270                }
1271            }
1272
1273            query.push_str(" ORDER BY last_updated, id");
1274            query.push_str(&format!(" LIMIT {}", batch_size + 1));
1275
1276            let params_slice: Vec<&dyn rusqlite::ToSql> =
1277                params_vec.iter().map(|p| p.as_ref()).collect();
1278
1279            let mut stmt = conn.prepare(&query).map_err(|e| {
1280                internal_error(format!("Failed to prepare compartment query: {}", e))
1281            })?;
1282
1283            let rows: Vec<(String, Vec<u8>, String)> = stmt
1284                .query_map(params_slice.as_slice(), |row| {
1285                    Ok((
1286                        row.get::<_, String>(0)?,
1287                        row.get::<_, Vec<u8>>(1)?,
1288                        row.get::<_, String>(2)?,
1289                    ))
1290                })
1291                .map_err(|e| internal_error(format!("Failed to query compartment: {}", e)))?
1292                .filter_map(|r| r.ok())
1293                .collect();
1294
1295            let has_more = rows.len() > batch_size as usize;
1296            let rows = if has_more {
1297                &rows[..batch_size as usize]
1298            } else {
1299                &rows[..]
1300            };
1301
1302            let mut lines = Vec::new();
1303            let mut last_cursor = None;
1304
1305            for (id, data, last_updated) in rows {
1306                let resource: Value = serde_json::from_slice(data)
1307                    .map_err(|e| internal_error(format!("Failed to parse resource: {}", e)))?;
1308                let line = serde_json::to_string(&resource)
1309                    .map_err(|e| internal_error(format!("Failed to serialize resource: {}", e)))?;
1310                lines.push(line);
1311                last_cursor = Some(format!("{}|{}", last_updated, id));
1312            }
1313
1314            return Ok(NdjsonBatch {
1315                lines,
1316                next_cursor: if has_more { last_cursor } else { None },
1317                is_last: !has_more,
1318            });
1319        }
1320
1321        // For other resource types, find resources whose payload references one
1322        // of the patients via `subject.reference` or `patient.reference`. We
1323        // read the JSON payload directly (json_extract over the `data` column)
1324        // rather than the search_index, so this is correct even when search is
1325        // offloaded to a secondary backend (sqlite-elasticsearch), which leaves
1326        // the local search_index empty.
1327        let patient_refs: Vec<String> = patient_ids
1328            .iter()
1329            .map(|id| format!("Patient/{}", id))
1330            .collect();
1331
1332        let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = vec![
1333            Box::new(tenant_id.to_string()),
1334            Box::new(resource_type.to_string()),
1335        ];
1336        let mut query = "SELECT id, data, last_updated FROM resources \
1337             WHERE tenant_id = ? AND resource_type = ? AND is_deleted = 0"
1338            .to_string();
1339
1340        if let Some(since) = request.since {
1341            query.push_str(" AND last_updated >= ?");
1342            params_vec.push(Box::new(since.to_rfc3339()));
1343        }
1344
1345        let placeholders: Vec<&str> = patient_refs.iter().map(|_| "?").collect();
1346        let in_list = placeholders.join(",");
1347        query.push_str(&format!(
1348            " AND (json_extract(data, '$.subject.reference') IN ({in_list}) \
1349               OR json_extract(data, '$.patient.reference') IN ({in_list}))"
1350        ));
1351        // The IN-list params appear twice (subject + patient), so bind twice.
1352        for patient_ref in &patient_refs {
1353            params_vec.push(Box::new(patient_ref.clone()));
1354        }
1355        for patient_ref in &patient_refs {
1356            params_vec.push(Box::new(patient_ref.clone()));
1357        }
1358
1359        if let Some(cursor) = cursor {
1360            let parts: Vec<&str> = cursor.splitn(2, '|').collect();
1361            if parts.len() == 2 {
1362                query.push_str(" AND (last_updated, id) > (?, ?)");
1363                params_vec.push(Box::new(parts[0].to_string()));
1364                params_vec.push(Box::new(parts[1].to_string()));
1365            }
1366        }
1367
1368        query.push_str(" ORDER BY last_updated, id");
1369        query.push_str(&format!(" LIMIT {}", batch_size + 1));
1370
1371        let params_slice: Vec<&dyn rusqlite::ToSql> =
1372            params_vec.iter().map(|p| p.as_ref()).collect();
1373
1374        let mut stmt = conn
1375            .prepare(&query)
1376            .map_err(|e| internal_error(format!("Failed to prepare compartment query: {}", e)))?;
1377
1378        let rows: Vec<(String, Vec<u8>, String)> = stmt
1379            .query_map(params_slice.as_slice(), |row| {
1380                Ok((
1381                    row.get::<_, String>(0)?,
1382                    row.get::<_, Vec<u8>>(1)?,
1383                    row.get::<_, String>(2)?,
1384                ))
1385            })
1386            .map_err(|e| internal_error(format!("Failed to query compartment: {}", e)))?
1387            .filter_map(|r| r.ok())
1388            .collect();
1389
1390        let has_more = rows.len() > batch_size as usize;
1391        let rows = if has_more {
1392            &rows[..batch_size as usize]
1393        } else {
1394            &rows[..]
1395        };
1396
1397        let mut lines = Vec::new();
1398        let mut last_cursor = None;
1399
1400        for (id, data, last_updated) in rows {
1401            let resource: Value = serde_json::from_slice(data)
1402                .map_err(|e| internal_error(format!("Failed to parse resource: {}", e)))?;
1403            let line = serde_json::to_string(&resource)
1404                .map_err(|e| internal_error(format!("Failed to serialize resource: {}", e)))?;
1405            lines.push(line);
1406            last_cursor = Some(format!("{}|{}", last_updated, id));
1407        }
1408
1409        Ok(NdjsonBatch {
1410            lines,
1411            next_cursor: if has_more { last_cursor } else { None },
1412            is_last: !has_more,
1413        })
1414    }
1415}
1416
1417#[async_trait]
1418impl GroupExportProvider for SqliteBackend {
1419    async fn get_group_members(
1420        &self,
1421        tenant: &TenantContext,
1422        group_id: &str,
1423    ) -> StorageResult<Vec<String>> {
1424        let conn = self.get_connection()?;
1425        let tenant_id = tenant.tenant_id().as_str();
1426
1427        // Get the Group resource
1428        let data: Vec<u8> = conn
1429            .query_row(
1430                "SELECT data FROM resources WHERE tenant_id = ?1 AND resource_type = 'Group' AND id = ?2 AND is_deleted = 0",
1431                params![tenant_id, group_id],
1432                |row| row.get(0),
1433            )
1434            .map_err(|e| {
1435                if matches!(e, rusqlite::Error::QueryReturnedNoRows) {
1436                    StorageError::BulkExport(BulkExportError::GroupNotFound {
1437                        group_id: group_id.to_string(),
1438                    })
1439                } else {
1440                    internal_error(format!("Failed to get group: {}", e))
1441                }
1442            })?;
1443
1444        let group: Value = serde_json::from_slice(&data)
1445            .map_err(|e| internal_error(format!("Failed to parse group: {}", e)))?;
1446
1447        // Extract member references from Group.member[].entity.reference
1448        let mut members = Vec::new();
1449        if let Some(member_array) = group.get("member").and_then(|m| m.as_array()) {
1450            for member in member_array {
1451                if let Some(entity) = member.get("entity") {
1452                    if let Some(reference) = entity.get("reference").and_then(|r| r.as_str()) {
1453                        members.push(reference.to_string());
1454                    }
1455                }
1456            }
1457        }
1458
1459        Ok(members)
1460    }
1461
1462    async fn resolve_group_patient_ids(
1463        &self,
1464        tenant: &TenantContext,
1465        group_id: &str,
1466    ) -> StorageResult<Vec<String>> {
1467        // Flatten nested Groups iteratively, guarding against membership
1468        // cycles with a visited set.
1469        use std::collections::HashSet;
1470        let mut visited_groups: HashSet<String> = HashSet::new();
1471        let mut seen_patients: HashSet<String> = HashSet::new();
1472        let mut patient_ids: Vec<String> = Vec::new();
1473        let mut worklist: Vec<String> = vec![group_id.to_string()];
1474
1475        while let Some(gid) = worklist.pop() {
1476            if !visited_groups.insert(gid.clone()) {
1477                continue; // cycle / already processed
1478            }
1479            let members = self.get_group_members(tenant, &gid).await?;
1480            for reference in members {
1481                if let Some(pid) = reference.strip_prefix("Patient/") {
1482                    if seen_patients.insert(pid.to_string()) {
1483                        patient_ids.push(pid.to_string());
1484                    }
1485                } else if let Some(nested) = reference.strip_prefix("Group/") {
1486                    worklist.push(nested.to_string());
1487                }
1488            }
1489        }
1490
1491        Ok(patient_ids)
1492    }
1493
1494    async fn get_group_members_with_periods(
1495        &self,
1496        tenant: &TenantContext,
1497        group_id: &str,
1498    ) -> StorageResult<Vec<(String, Option<DateTime<Utc>>)>> {
1499        let conn = self.get_connection()?;
1500        let tenant_id = tenant.tenant_id().as_str();
1501        let data: Vec<u8> = conn
1502            .query_row(
1503                "SELECT data FROM resources
1504                 WHERE tenant_id = ?1 AND resource_type = 'Group'
1505                   AND id = ?2 AND is_deleted = 0",
1506                params![tenant_id, group_id],
1507                |row| row.get(0),
1508            )
1509            .map_err(|e| {
1510                if matches!(e, rusqlite::Error::QueryReturnedNoRows) {
1511                    StorageError::BulkExport(BulkExportError::GroupNotFound {
1512                        group_id: group_id.to_string(),
1513                    })
1514                } else {
1515                    internal_error(format!("Failed to get group: {}", e))
1516                }
1517            })?;
1518        let group: Value = serde_json::from_slice(&data)
1519            .map_err(|e| internal_error(format!("Failed to parse group: {}", e)))?;
1520        let mut out = Vec::new();
1521        if let Some(arr) = group.get("member").and_then(|m| m.as_array()) {
1522            for member in arr {
1523                let Some(reference) = member
1524                    .get("entity")
1525                    .and_then(|e| e.get("reference"))
1526                    .and_then(|r| r.as_str())
1527                else {
1528                    continue;
1529                };
1530                let period_start = member
1531                    .get("period")
1532                    .and_then(|p| p.get("start"))
1533                    .and_then(|s| s.as_str())
1534                    .and_then(|s| {
1535                        DateTime::parse_from_rfc3339(s)
1536                            .ok()
1537                            .map(|dt| dt.with_timezone(&Utc))
1538                    });
1539                out.push((reference.to_string(), period_start));
1540            }
1541        }
1542        Ok(out)
1543    }
1544}
1545
1546#[cfg(test)]
1547mod tests {
1548    use super::*;
1549    use crate::core::ResourceStorage;
1550    use crate::tenant::{TenantId, TenantPermissions};
1551    use helios_fhir::FhirVersion;
1552    use serde_json::json;
1553
1554    fn create_test_backend() -> SqliteBackend {
1555        let backend = SqliteBackend::in_memory().unwrap();
1556        backend.init_schema().unwrap();
1557        backend
1558    }
1559
1560    fn create_test_tenant() -> TenantContext {
1561        TenantContext::new(
1562            TenantId::new("test-tenant"),
1563            TenantPermissions::full_access(),
1564        )
1565    }
1566
1567    /// Wraps an `ExportRequest` in a `StartExportInput` with default kickoff metadata.
1568    fn test_input(request: ExportRequest) -> StartExportInput {
1569        StartExportInput {
1570            request,
1571            transaction_time: Utc::now(),
1572            request_url: "http://localhost/$export".to_string(),
1573            owner_subject: Some("test-subject".to_string()),
1574            fhir_version: FhirVersion::default(),
1575        }
1576    }
1577
1578    #[tokio::test]
1579    async fn test_start_export() {
1580        let backend = create_test_backend();
1581        let tenant = create_test_tenant();
1582
1583        let request = ExportRequest::system().with_types(vec!["Patient".to_string()]);
1584        let job_id = backend
1585            .start_export(&tenant, test_input(request))
1586            .await
1587            .unwrap();
1588
1589        assert!(!job_id.as_str().is_empty());
1590
1591        let progress = backend.get_export_status(&tenant, &job_id).await.unwrap();
1592        assert_eq!(progress.status, ExportStatus::Accepted);
1593    }
1594
1595    #[tokio::test]
1596    async fn test_cancel_export() {
1597        let backend = create_test_backend();
1598        let tenant = create_test_tenant();
1599
1600        let job_id = backend
1601            .start_export(&tenant, test_input(ExportRequest::system()))
1602            .await
1603            .unwrap();
1604
1605        backend.cancel_export(&tenant, &job_id).await.unwrap();
1606
1607        let progress = backend.get_export_status(&tenant, &job_id).await.unwrap();
1608        assert_eq!(progress.status, ExportStatus::Cancelled);
1609    }
1610
1611    #[tokio::test]
1612    async fn test_list_exports() {
1613        let backend = create_test_backend();
1614        let tenant = create_test_tenant();
1615
1616        let _job_id1 = backend
1617            .start_export(&tenant, test_input(ExportRequest::system()))
1618            .await
1619            .unwrap();
1620        let _job_id2 = backend
1621            .start_export(&tenant, test_input(ExportRequest::patient()))
1622            .await
1623            .unwrap();
1624
1625        let exports = backend.list_exports(&tenant, false).await.unwrap();
1626        assert_eq!(exports.len(), 2);
1627    }
1628
1629    #[tokio::test]
1630    async fn test_count_active_exports() {
1631        let backend = create_test_backend();
1632        let tenant = create_test_tenant();
1633
1634        for _ in 0..3 {
1635            backend
1636                .start_export(&tenant, test_input(ExportRequest::system()))
1637                .await
1638                .unwrap();
1639        }
1640        assert_eq!(backend.count_active_exports(&tenant).await.unwrap(), 3);
1641    }
1642
1643    #[tokio::test]
1644    async fn test_get_export_job_metadata() {
1645        let backend = create_test_backend();
1646        let tenant = create_test_tenant();
1647
1648        let job_id = backend
1649            .start_export(&tenant, test_input(ExportRequest::patient()))
1650            .await
1651            .unwrap();
1652
1653        let meta = backend
1654            .get_export_job_metadata(&tenant, &job_id)
1655            .await
1656            .unwrap();
1657        assert_eq!(meta.status, ExportStatus::Accepted);
1658        assert_eq!(meta.owner_subject.as_deref(), Some("test-subject"));
1659        assert!(matches!(meta.level, ExportLevel::Patient));
1660
1661        let missing = backend
1662            .get_export_job_metadata(&tenant, &ExportJobId::from_string("nope"))
1663            .await;
1664        assert!(missing.is_err());
1665    }
1666
1667    #[tokio::test]
1668    async fn test_claim_and_worker_lifecycle() {
1669        let backend = create_test_backend();
1670        let tenant = create_test_tenant();
1671
1672        let job_id = backend
1673            .start_export(&tenant, test_input(ExportRequest::system()))
1674            .await
1675            .unwrap();
1676
1677        let worker = WorkerId::new("worker-1");
1678        let lease = backend
1679            .claim_next(&worker, StdDuration::from_secs(60))
1680            .await
1681            .unwrap()
1682            .expect("a job should be claimable");
1683        assert_eq!(lease.job_id, job_id);
1684        assert_eq!(lease.fencing_token, 1);
1685
1686        // A second claim finds nothing (the only job is now in-progress).
1687        assert!(
1688            backend
1689                .claim_next(&worker, StdDuration::from_secs(60))
1690                .await
1691                .unwrap()
1692                .is_none()
1693        );
1694
1695        // Worker can load, progress, finish.
1696        backend
1697            .mark_export_in_progress(&tenant, &job_id, &worker, lease.fencing_token)
1698            .await
1699            .unwrap();
1700        backend
1701            .update_export_type_progress(
1702                &tenant,
1703                &job_id,
1704                &worker,
1705                lease.fencing_token,
1706                &TypeExportProgress::new("Patient"),
1707            )
1708            .await
1709            .unwrap();
1710        backend
1711            .finish_export_job(&tenant, &job_id, &worker, lease.fencing_token)
1712            .await
1713            .unwrap();
1714
1715        let progress = backend.get_export_status(&tenant, &job_id).await.unwrap();
1716        assert_eq!(progress.status, ExportStatus::Complete);
1717    }
1718
1719    #[tokio::test]
1720    async fn test_stale_worker_fenced_out() {
1721        let backend = create_test_backend();
1722        let tenant = create_test_tenant();
1723
1724        let job_id = backend
1725            .start_export(&tenant, test_input(ExportRequest::system()))
1726            .await
1727            .unwrap();
1728
1729        let worker_a = WorkerId::new("worker-a");
1730        let lease_a = backend
1731            .claim_next(&worker_a, StdDuration::from_millis(1))
1732            .await
1733            .unwrap()
1734            .unwrap();
1735
1736        // Lease expires; worker B reclaims, bumping the fencing token.
1737        tokio::time::sleep(std::time::Duration::from_millis(5)).await;
1738        let worker_b = WorkerId::new("worker-b");
1739        let lease_b = backend
1740            .claim_next(&worker_b, StdDuration::from_secs(60))
1741            .await
1742            .unwrap()
1743            .unwrap();
1744        assert!(lease_b.fencing_token > lease_a.fencing_token);
1745
1746        // Worker A's stale mutations are all rejected as LeaseLost.
1747        assert!(matches!(
1748            backend
1749                .mark_export_in_progress(&tenant, &job_id, &worker_a, lease_a.fencing_token)
1750                .await,
1751            Err(LeaseError::LeaseLost { .. })
1752        ));
1753        assert!(matches!(
1754            backend
1755                .update_export_type_progress(
1756                    &tenant,
1757                    &job_id,
1758                    &worker_a,
1759                    lease_a.fencing_token,
1760                    &TypeExportProgress::new("Patient"),
1761                )
1762                .await,
1763            Err(LeaseError::LeaseLost { .. })
1764        ));
1765        assert!(matches!(
1766            backend
1767                .finish_export_job(&tenant, &job_id, &worker_a, lease_a.fencing_token)
1768                .await,
1769            Err(LeaseError::LeaseLost { .. })
1770        ));
1771
1772        // Worker B can still operate.
1773        backend
1774            .finish_export_job(&tenant, &job_id, &worker_b, lease_b.fencing_token)
1775            .await
1776            .unwrap();
1777    }
1778
1779    #[tokio::test]
1780    async fn test_since_newly_added_exclude_filters_late_joiners() {
1781        use crate::core::bulk_export_output::{ExportPartKey, ExportPartWriter};
1782        let _ = ExportPartKey::output("t", ExportJobId::new(), "x", 0, 0); // import sanity
1783
1784        let backend = create_test_backend();
1785        let tenant = create_test_tenant();
1786
1787        // A Group with two members: one joined before _since (period.start =
1788        // 2024-01-01), one joined after (period.start = 2026-06-01).
1789        backend
1790            .create(
1791                &tenant,
1792                "Group",
1793                json!({
1794                    "resourceType": "Group", "id": "g-cohort",
1795                    "member": [
1796                        {
1797                            "entity": {"reference": "Patient/p-old"},
1798                            "period": {"start": "2024-01-01T00:00:00Z"}
1799                        },
1800                        {
1801                            "entity": {"reference": "Patient/p-new"},
1802                            "period": {"start": "2026-06-01T00:00:00Z"}
1803                        }
1804                    ]
1805                }),
1806                FhirVersion::default(),
1807            )
1808            .await
1809            .unwrap();
1810
1811        let members = backend
1812            .get_group_members_with_periods(&tenant, "g-cohort")
1813            .await
1814            .unwrap();
1815        assert_eq!(members.len(), 2);
1816        assert!(members.iter().all(|(_, p)| p.is_some()));
1817
1818        // Worker-level filter logic: with exclude=true and _since=2025,
1819        // p-new (joined 2026) should be filtered out; p-old kept.
1820        let since = chrono::DateTime::parse_from_rfc3339("2025-01-01T00:00:00Z")
1821            .unwrap()
1822            .with_timezone(&Utc);
1823        let kept: Vec<String> = members
1824            .iter()
1825            .filter_map(|(reference, period_start)| {
1826                let pid = reference.strip_prefix("Patient/")?;
1827                match period_start {
1828                    Some(start) if *start > since => None,
1829                    _ => Some(pid.to_string()),
1830                }
1831            })
1832            .collect();
1833        assert_eq!(kept, vec!["p-old".to_string()]);
1834
1835        // Drop reference to silence the unused-import allowance.
1836        let _ = ExportPartWriter::new(Box::pin(Vec::<u8>::new()));
1837    }
1838
1839    #[tokio::test]
1840    async fn test_patient_compartment_uses_resource_payload_not_search_index() {
1841        // Regression: when search is offloaded (sqlite-elasticsearch), the local
1842        // search_index is empty, so compartment lookups must read the resource
1843        // payload directly. Here we force-offload to guarantee no search_index
1844        // rows exist, then confirm the Observation is still found via its
1845        // subject.reference.
1846        let mut backend = SqliteBackend::in_memory().unwrap();
1847        backend.init_schema().unwrap();
1848        backend.set_search_offloaded(true);
1849        let tenant = create_test_tenant();
1850
1851        backend
1852            .create(
1853                &tenant,
1854                "Patient",
1855                json!({"resourceType": "Patient", "id": "p1"}),
1856                FhirVersion::default(),
1857            )
1858            .await
1859            .unwrap();
1860        backend
1861            .create(
1862                &tenant,
1863                "Observation",
1864                json!({
1865                    "resourceType": "Observation", "id": "o1", "status": "final",
1866                    "subject": {"reference": "Patient/p1"}
1867                }),
1868                FhirVersion::default(),
1869            )
1870            .await
1871            .unwrap();
1872
1873        let request = ExportRequest::patient();
1874        let batch = backend
1875            .fetch_patient_compartment_batch(
1876                &tenant,
1877                &request,
1878                "Observation",
1879                &["p1".to_string()],
1880                None,
1881                100,
1882            )
1883            .await
1884            .unwrap();
1885        assert_eq!(
1886            batch.lines.len(),
1887            1,
1888            "Observation should be found via subject.reference"
1889        );
1890        assert!(batch.lines[0].contains("\"o1\""));
1891    }
1892
1893    #[tokio::test]
1894    async fn test_resolve_nested_groups_with_cycle_guard() {
1895        let backend = create_test_backend();
1896        let tenant = create_test_tenant();
1897
1898        // g1 -> [Patient/p1, Group/g2]; g2 -> [Patient/p2, Group/g1 (cycle)]
1899        backend
1900            .create(
1901                &tenant,
1902                "Group",
1903                json!({
1904                    "resourceType": "Group", "id": "g1",
1905                    "member": [
1906                        {"entity": {"reference": "Patient/p1"}},
1907                        {"entity": {"reference": "Group/g2"}}
1908                    ]
1909                }),
1910                FhirVersion::default(),
1911            )
1912            .await
1913            .unwrap();
1914        backend
1915            .create(
1916                &tenant,
1917                "Group",
1918                json!({
1919                    "resourceType": "Group", "id": "g2",
1920                    "member": [
1921                        {"entity": {"reference": "Patient/p2"}},
1922                        {"entity": {"reference": "Group/g1"}}
1923                    ]
1924                }),
1925                FhirVersion::default(),
1926            )
1927            .await
1928            .unwrap();
1929
1930        let mut ids = backend
1931            .resolve_group_patient_ids(&tenant, "g1")
1932            .await
1933            .unwrap();
1934        ids.sort();
1935        // Both patients resolved exactly once; the cycle did not loop forever.
1936        assert_eq!(ids, vec!["p1".to_string(), "p2".to_string()]);
1937    }
1938
1939    #[tokio::test]
1940    async fn test_list_export_types() {
1941        let backend = create_test_backend();
1942        let tenant = create_test_tenant();
1943
1944        // Create some resources
1945        backend
1946            .create(
1947                &tenant,
1948                "Patient",
1949                json!({"resourceType": "Patient", "name": [{"family": "Test"}]}),
1950                FhirVersion::default(),
1951            )
1952            .await
1953            .unwrap();
1954
1955        backend
1956            .create(
1957                &tenant,
1958                "Observation",
1959                json!({"resourceType": "Observation", "status": "final"}),
1960                FhirVersion::default(),
1961            )
1962            .await
1963            .unwrap();
1964
1965        let request = ExportRequest::system();
1966        let types = backend.list_export_types(&tenant, &request).await.unwrap();
1967
1968        assert!(types.contains(&"Patient".to_string()));
1969        assert!(types.contains(&"Observation".to_string()));
1970    }
1971
1972    #[tokio::test]
1973    async fn test_fetch_export_batch() {
1974        let backend = create_test_backend();
1975        let tenant = create_test_tenant();
1976
1977        // Create some resources
1978        for i in 0..5 {
1979            backend
1980                .create(
1981                    &tenant,
1982                    "Patient",
1983                    json!({"resourceType": "Patient", "name": [{"family": format!("Patient{}", i)}]}),
1984                    FhirVersion::default(),
1985                )
1986                .await
1987                .unwrap();
1988        }
1989
1990        let request = ExportRequest::system();
1991        let batch = backend
1992            .fetch_export_batch(&tenant, &request, "Patient", None, 3)
1993            .await
1994            .unwrap();
1995
1996        assert_eq!(batch.lines.len(), 3);
1997        assert!(!batch.is_last);
1998        assert!(batch.next_cursor.is_some());
1999
2000        // Fetch next batch
2001        let batch2 = backend
2002            .fetch_export_batch(
2003                &tenant,
2004                &request,
2005                "Patient",
2006                batch.next_cursor.as_deref(),
2007                3,
2008            )
2009            .await
2010            .unwrap();
2011
2012        assert_eq!(batch2.lines.len(), 2);
2013        assert!(batch2.is_last);
2014    }
2015
2016    #[tokio::test]
2017    async fn test_delete_export() {
2018        let backend = create_test_backend();
2019        let tenant = create_test_tenant();
2020
2021        let job_id = backend
2022            .start_export(&tenant, test_input(ExportRequest::system()))
2023            .await
2024            .unwrap();
2025
2026        backend.delete_export(&tenant, &job_id).await.unwrap();
2027
2028        // Should fail to get status now
2029        let result = backend.get_export_status(&tenant, &job_id).await;
2030        assert!(matches!(
2031            result,
2032            Err(StorageError::BulkExport(
2033                BulkExportError::JobNotFound { .. }
2034            ))
2035        ));
2036    }
2037}