helios_persistence/core/bulk_export_output.rs
1//! Output storage for bulk export NDJSON files.
2//!
3//! The [`ExportOutputStore`] trait decouples *where the exported bytes go*
4//! (local filesystem, S3, …) from the job-state backend. The job-state
5//! backend stores keys; the output store turns keys into bytes and URLs.
6
7use std::time::Duration;
8
9use async_trait::async_trait;
10use tokio::io::{AsyncRead, AsyncWrite};
11
12use crate::core::bulk_export::ExportJobId;
13use crate::error::StorageResult;
14use crate::tenant::TenantContext;
15
16/// A stable identifier for a single output part.
17///
18/// The `fencing_token` is embedded so a zombie worker (one that lost its
19/// lease) writes to a *different* key than the live worker holding the
20/// reclaimed job, preventing output corruption.
21#[derive(Debug, Clone, PartialEq, Eq, Hash)]
22pub struct ExportPartKey {
23 /// The tenant the job belongs to.
24 pub tenant_id: String,
25 /// The export job this part belongs to.
26 pub job_id: ExportJobId,
27 /// The FHIR resource type contained in the part.
28 pub resource_type: String,
29 /// `"output"` or `"error"`.
30 pub file_type: String,
31 /// The zero-based part index within `(job, file_type, resource_type)`.
32 pub part_index: u32,
33 /// The fencing token of the worker that produced the part.
34 pub fencing_token: u64,
35}
36
37impl ExportPartKey {
38 /// Creates a new output part key (`file_type = "output"`).
39 pub fn output(
40 tenant_id: impl Into<String>,
41 job_id: ExportJobId,
42 resource_type: impl Into<String>,
43 part_index: u32,
44 fencing_token: u64,
45 ) -> Self {
46 Self {
47 tenant_id: tenant_id.into(),
48 job_id,
49 resource_type: resource_type.into(),
50 file_type: "output".to_string(),
51 part_index,
52 fencing_token,
53 }
54 }
55
56 /// Creates a new error part key (`file_type = "error"`).
57 pub fn error(
58 tenant_id: impl Into<String>,
59 job_id: ExportJobId,
60 resource_type: impl Into<String>,
61 part_index: u32,
62 fencing_token: u64,
63 ) -> Self {
64 Self {
65 tenant_id: tenant_id.into(),
66 job_id,
67 resource_type: resource_type.into(),
68 file_type: "error".to_string(),
69 part_index,
70 fencing_token,
71 }
72 }
73
74 /// The `{resource_type}-{part_index}` segment used in download routes.
75 pub fn part_segment(&self) -> String {
76 format!("{}-{}", self.resource_type, self.part_index)
77 }
78}
79
80/// An open writer for a single export output part.
81///
82/// Wraps a boxed async writer plus a line counter. Callers push NDJSON lines
83/// with [`write_line`](ExportPartWriter::write_line) and then hand the writer
84/// to [`ExportOutputStore::finalize_part`].
85pub struct ExportPartWriter {
86 /// The underlying async byte sink.
87 pub writer: std::pin::Pin<Box<dyn AsyncWrite + Send>>,
88 /// Number of lines written so far.
89 pub line_count: u64,
90 /// Number of bytes written so far.
91 pub byte_count: u64,
92}
93
94impl ExportPartWriter {
95 /// Creates a new part writer over the given async sink.
96 pub fn new(writer: std::pin::Pin<Box<dyn AsyncWrite + Send>>) -> Self {
97 Self {
98 writer,
99 line_count: 0,
100 byte_count: 0,
101 }
102 }
103
104 /// Writes one NDJSON line (a trailing newline is appended).
105 pub async fn write_line(&mut self, line: &str) -> std::io::Result<()> {
106 use tokio::io::AsyncWriteExt;
107 self.writer.write_all(line.as_bytes()).await?;
108 self.writer.write_all(b"\n").await?;
109 self.line_count += 1;
110 self.byte_count += line.len() as u64 + 1;
111 Ok(())
112 }
113
114 /// Flushes the underlying writer.
115 pub async fn flush(&mut self) -> std::io::Result<()> {
116 use tokio::io::AsyncWriteExt;
117 self.writer.flush().await
118 }
119}
120
121impl std::fmt::Debug for ExportPartWriter {
122 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
123 f.debug_struct("ExportPartWriter")
124 .field("line_count", &self.line_count)
125 .field("byte_count", &self.byte_count)
126 .finish()
127 }
128}
129
130/// A finalized, immutable output part as it will appear in the manifest.
131#[derive(Debug, Clone)]
132pub struct FinalizedPart {
133 /// The part's stable key.
134 pub key: ExportPartKey,
135 /// The resource type contained in the part.
136 pub resource_type: String,
137 /// Number of resources (lines) in the part.
138 pub line_count: u64,
139 /// Total byte size of the part.
140 pub size_bytes: u64,
141}
142
143/// A download URL plus the access posture the manifest should advertise.
144#[derive(Debug, Clone)]
145pub struct DownloadUrl {
146 /// The URL the client should fetch.
147 pub url: String,
148 /// `true` if the URL requires the kickoff Bearer token (HFS-served);
149 /// `false` if it is pre-signed and the client must NOT send a token.
150 pub requires_access_token: bool,
151}
152
153/// Pluggable backend for bulk export output files.
154///
155/// Implementations decide where NDJSON output physically lives (local FS, S3,
156/// …) and how download URLs are minted. The job-state backend is unaware of
157/// this — it stores keys; the output store turns keys into bytes and URLs.
158#[async_trait]
159pub trait ExportOutputStore: Send + Sync {
160 /// Opens an async writer for a new (or re-finalized) output part.
161 async fn open_writer(&self, key: &ExportPartKey) -> StorageResult<ExportPartWriter>;
162
163 /// Marks a part as finalized and immutable.
164 ///
165 /// For object stores this completes the multipart upload; for the local
166 /// filesystem this fsyncs and renames `.tmp` → final.
167 async fn finalize_part(
168 &self,
169 key: &ExportPartKey,
170 writer: ExportPartWriter,
171 ) -> StorageResult<FinalizedPart>;
172
173 /// Produces a download URL for a finalized part.
174 async fn download_url(&self, key: &ExportPartKey, ttl: Duration) -> StorageResult<DownloadUrl>;
175
176 /// Opens an async reader over a finalized part (HFS-served download path).
177 async fn open_reader(
178 &self,
179 key: &ExportPartKey,
180 ) -> StorageResult<std::pin::Pin<Box<dyn AsyncRead + Send>>>;
181
182 /// Deletes all output parts for a job. Idempotent — a missing job is `Ok`.
183 async fn delete_job_outputs(
184 &self,
185 tenant: &TenantContext,
186 job_id: &ExportJobId,
187 ) -> StorageResult<()>;
188}