Skip to main content

helios_persistence/core/
bulk_export_output.rs

1//! Output storage for bulk export NDJSON files.
2//!
3//! The [`ExportOutputStore`] trait decouples *where the exported bytes go*
4//! (local filesystem, S3, …) from the job-state backend. The job-state
5//! backend stores keys; the output store turns keys into bytes and URLs.
6
7use std::time::Duration;
8
9use async_trait::async_trait;
10use tokio::io::{AsyncRead, AsyncWrite};
11
12use crate::core::bulk_export::ExportJobId;
13use crate::error::StorageResult;
14use crate::tenant::TenantContext;
15
16/// A stable identifier for a single output part.
17///
18/// The `fencing_token` is embedded so a zombie worker (one that lost its
19/// lease) writes to a *different* key than the live worker holding the
20/// reclaimed job, preventing output corruption.
21#[derive(Debug, Clone, PartialEq, Eq, Hash)]
22pub struct ExportPartKey {
23    /// The tenant the job belongs to.
24    pub tenant_id: String,
25    /// The export job this part belongs to.
26    pub job_id: ExportJobId,
27    /// The FHIR resource type contained in the part.
28    pub resource_type: String,
29    /// `"output"` or `"error"`.
30    pub file_type: String,
31    /// The zero-based part index within `(job, file_type, resource_type)`.
32    pub part_index: u32,
33    /// The fencing token of the worker that produced the part.
34    pub fencing_token: u64,
35}
36
37impl ExportPartKey {
38    /// Creates a new output part key (`file_type = "output"`).
39    pub fn output(
40        tenant_id: impl Into<String>,
41        job_id: ExportJobId,
42        resource_type: impl Into<String>,
43        part_index: u32,
44        fencing_token: u64,
45    ) -> Self {
46        Self {
47            tenant_id: tenant_id.into(),
48            job_id,
49            resource_type: resource_type.into(),
50            file_type: "output".to_string(),
51            part_index,
52            fencing_token,
53        }
54    }
55
56    /// Creates a new error part key (`file_type = "error"`).
57    pub fn error(
58        tenant_id: impl Into<String>,
59        job_id: ExportJobId,
60        resource_type: impl Into<String>,
61        part_index: u32,
62        fencing_token: u64,
63    ) -> Self {
64        Self {
65            tenant_id: tenant_id.into(),
66            job_id,
67            resource_type: resource_type.into(),
68            file_type: "error".to_string(),
69            part_index,
70            fencing_token,
71        }
72    }
73
74    /// The `{resource_type}-{part_index}` segment used in download routes.
75    pub fn part_segment(&self) -> String {
76        format!("{}-{}", self.resource_type, self.part_index)
77    }
78}
79
80/// An open writer for a single export output part.
81///
82/// Wraps a boxed async writer plus a line counter. Callers push NDJSON lines
83/// with [`write_line`](ExportPartWriter::write_line) and then hand the writer
84/// to [`ExportOutputStore::finalize_part`].
85pub struct ExportPartWriter {
86    /// The underlying async byte sink.
87    pub writer: std::pin::Pin<Box<dyn AsyncWrite + Send>>,
88    /// Number of lines written so far.
89    pub line_count: u64,
90    /// Number of bytes written so far.
91    pub byte_count: u64,
92}
93
94impl ExportPartWriter {
95    /// Creates a new part writer over the given async sink.
96    pub fn new(writer: std::pin::Pin<Box<dyn AsyncWrite + Send>>) -> Self {
97        Self {
98            writer,
99            line_count: 0,
100            byte_count: 0,
101        }
102    }
103
104    /// Writes one NDJSON line (a trailing newline is appended).
105    pub async fn write_line(&mut self, line: &str) -> std::io::Result<()> {
106        use tokio::io::AsyncWriteExt;
107        self.writer.write_all(line.as_bytes()).await?;
108        self.writer.write_all(b"\n").await?;
109        self.line_count += 1;
110        self.byte_count += line.len() as u64 + 1;
111        Ok(())
112    }
113
114    /// Flushes the underlying writer.
115    pub async fn flush(&mut self) -> std::io::Result<()> {
116        use tokio::io::AsyncWriteExt;
117        self.writer.flush().await
118    }
119}
120
121impl std::fmt::Debug for ExportPartWriter {
122    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
123        f.debug_struct("ExportPartWriter")
124            .field("line_count", &self.line_count)
125            .field("byte_count", &self.byte_count)
126            .finish()
127    }
128}
129
130/// A finalized, immutable output part as it will appear in the manifest.
131#[derive(Debug, Clone)]
132pub struct FinalizedPart {
133    /// The part's stable key.
134    pub key: ExportPartKey,
135    /// The resource type contained in the part.
136    pub resource_type: String,
137    /// Number of resources (lines) in the part.
138    pub line_count: u64,
139    /// Total byte size of the part.
140    pub size_bytes: u64,
141}
142
143/// A download URL plus the access posture the manifest should advertise.
144#[derive(Debug, Clone)]
145pub struct DownloadUrl {
146    /// The URL the client should fetch.
147    pub url: String,
148    /// `true` if the URL requires the kickoff Bearer token (HFS-served);
149    /// `false` if it is pre-signed and the client must NOT send a token.
150    pub requires_access_token: bool,
151}
152
153/// Pluggable backend for bulk export output files.
154///
155/// Implementations decide where NDJSON output physically lives (local FS, S3,
156/// …) and how download URLs are minted. The job-state backend is unaware of
157/// this — it stores keys; the output store turns keys into bytes and URLs.
158#[async_trait]
159pub trait ExportOutputStore: Send + Sync {
160    /// Opens an async writer for a new (or re-finalized) output part.
161    async fn open_writer(&self, key: &ExportPartKey) -> StorageResult<ExportPartWriter>;
162
163    /// Marks a part as finalized and immutable.
164    ///
165    /// For object stores this completes the multipart upload; for the local
166    /// filesystem this fsyncs and renames `.tmp` → final.
167    async fn finalize_part(
168        &self,
169        key: &ExportPartKey,
170        writer: ExportPartWriter,
171    ) -> StorageResult<FinalizedPart>;
172
173    /// Produces a download URL for a finalized part.
174    async fn download_url(&self, key: &ExportPartKey, ttl: Duration) -> StorageResult<DownloadUrl>;
175
176    /// Opens an async reader over a finalized part (HFS-served download path).
177    async fn open_reader(
178        &self,
179        key: &ExportPartKey,
180    ) -> StorageResult<std::pin::Pin<Box<dyn AsyncRead + Send>>>;
181
182    /// Deletes all output parts for a job. Idempotent — a missing job is `Ok`.
183    async fn delete_job_outputs(
184        &self,
185        tenant: &TenantContext,
186        job_id: &ExportJobId,
187    ) -> StorageResult<()>;
188}