helios_sof/lib.rs
1//! # SQL-on-FHIR Implementation
2//!
3//! This crate provides a complete implementation of the [SQL-on-FHIR
4//! specification](https://sql-on-fhir.org/ig/latest),
5//! enabling the transformation of FHIR resources into tabular data using declarative
6//! ViewDefinitions. It supports all major FHIR versions (R4, R4B, R5, R6) through
7//! a version-agnostic abstraction layer.
8
9//!
10//! There are three consumers of this crate:
11//! - [sof_cli](../sof_cli/index.html) - A command-line interface for the SQL-on-FHIR implementation,
12//! allowing users to execute ViewDefinition transformations on FHIR Bundle resources
13//! and output the results in various formats.
14//! - [sof_server](../sof_server/index.html) - A stateless HTTP server implementation for the SQL-on-FHIR specification,
15//! enabling HTTP-based access to ViewDefinition transformation capabilities.
16//! - [hfs](../hfs/index.html) - The full featured Helios FHIR Server.
17//!
18//! ## Architecture
19//!
20//! The SOF crate is organized around these key components:
21//! - **Version-agnostic enums** ([`SofViewDefinition`], [`SofBundle`]): Multi-version containers
22//! - **Processing engine** ([`run_view_definition`]): Core transformation logic
23//! - **Output formats** ([`ContentType`]): Support for CSV, JSON, NDJSON, and Parquet
24//! - **Trait abstractions** ([`ViewDefinitionTrait`], [`BundleTrait`]): Version independence
25//!
26//! ## Key Features
27//!
28//! - **Multi-version FHIR support**: Works with R4, R4B, R5, and R6 resources
29//! - **FHIRPath evaluation**: Complex path expressions for data extraction
30//! - **forEach iteration**: Supports flattening of nested FHIR structures
31//! - **unionAll operations**: Combines multiple select statements
32//! - **Collection handling**: Proper array serialization for multi-valued fields
33//! - **Output formats**: CSV (with/without headers), JSON, NDJSON, Parquet support
34//!
35//! ## Usage Example
36//!
37//! ```rust
38//! # #[cfg(not(target_os = "windows"))]
39//! # {
40//! use helios_sof::{SofViewDefinition, SofBundle, ContentType, run_view_definition};
41//! use helios_fhir::FhirVersion;
42//!
43//! # #[cfg(feature = "R4")]
44//! # {
45//! // Parse a ViewDefinition and Bundle from JSON
46//! let view_definition_json = r#"{
47//! "resourceType": "ViewDefinition",
48//! "status": "active",
49//! "resource": "Patient",
50//! "select": [{
51//! "column": [{
52//! "name": "id",
53//! "path": "id"
54//! }, {
55//! "name": "name",
56//! "path": "name.family"
57//! }]
58//! }]
59//! }"#;
60//!
61//! let bundle_json = r#"{
62//! "resourceType": "Bundle",
63//! "type": "collection",
64//! "entry": [{
65//! "resource": {
66//! "resourceType": "Patient",
67//! "id": "example",
68//! "name": [{
69//! "family": "Doe",
70//! "given": ["John"]
71//! }]
72//! }
73//! }]
74//! }"#;
75//!
76//! let view_definition: helios_fhir::r4::ViewDefinition = serde_json::from_str(view_definition_json)?;
77//! let bundle: helios_fhir::r4::Bundle = serde_json::from_str(bundle_json)?;
78//!
79//! // Wrap in version-agnostic containers
80//! let sof_view = SofViewDefinition::R4(view_definition);
81//! let sof_bundle = SofBundle::R4(bundle);
82//!
83//! // Transform to CSV with headers
84//! let csv_output = run_view_definition(
85//! sof_view,
86//! sof_bundle,
87//! ContentType::CsvWithHeader
88//! )?;
89//!
90//! // Check the CSV output
91//! let csv_string = String::from_utf8(csv_output)?;
92//! assert!(csv_string.contains("id,name"));
93//! // CSV values are quoted
94//! assert!(csv_string.contains("example") && csv_string.contains("Doe"));
95//! # }
96//! # }
97//! # Ok::<(), Box<dyn std::error::Error>>(())
98//! ```
99//!
100//! ## Advanced Features
101//!
102//! ### forEach Iteration
103//!
104//! ViewDefinitions can iterate over collections using `forEach` and `forEachOrNull`:
105//!
106//! ```json
107//! {
108//! "select": [{
109//! "forEach": "name",
110//! "column": [{
111//! "name": "family_name",
112//! "path": "family"
113//! }]
114//! }]
115//! }
116//! ```
117//!
118//! ### Constants and Variables
119//!
120//! Define reusable values in ViewDefinitions:
121//!
122//! ```json
123//! {
124//! "constant": [{
125//! "name": "system",
126//! "valueString": "http://loinc.org"
127//! }],
128//! "select": [{
129//! "where": [{
130//! "path": "code.coding.system = %system"
131//! }]
132//! }]
133//! }
134//! ```
135//!
136//! ### Where Clauses
137//!
138//! Filter resources using FHIRPath expressions:
139//!
140//! ```json
141//! {
142//! "where": [{
143//! "path": "active = true"
144//! }, {
145//! "path": "birthDate.exists()"
146//! }]
147//! }
148//! ```
149//!
150//! ## Error Handling
151//!
152//! The crate provides comprehensive error handling through [`SofError`]:
153//!
154//! ```rust,no_run
155//! use helios_sof::{SofError, SofViewDefinition, SofBundle, ContentType, run_view_definition};
156//!
157//! # let view = SofViewDefinition::R4(helios_fhir::r4::ViewDefinition::default());
158//! # let bundle = SofBundle::R4(helios_fhir::r4::Bundle::default());
159//! # let content_type = ContentType::Json;
160//! match run_view_definition(view, bundle, content_type) {
161//! Ok(output) => {
162//! // Process successful transformation
163//! },
164//! Err(SofError::InvalidViewDefinition(msg)) => {
165//! eprintln!("ViewDefinition validation failed: {}", msg);
166//! },
167//! Err(SofError::FhirPathError(msg)) => {
168//! eprintln!("FHIRPath evaluation failed: {}", msg);
169//! },
170//! Err(e) => {
171//! eprintln!("Other error: {}", e);
172//! }
173//! }
174//! ```
175//! ## Feature Flags
176//!
177//! Enable support for specific FHIR versions:
178//! - `R4`: FHIR 4.0.1 support
179//! - `R4B`: FHIR 4.3.0 support
180//! - `R5`: FHIR 5.0.0 support
181//! - `R6`: FHIR 6.0.0 support
182
183pub mod data_source;
184pub mod parquet_schema;
185pub mod traits;
186
187use chrono::{DateTime, Utc};
188use helios_fhirpath::{EvaluationContext, EvaluationResult, evaluate_expression};
189use rayon::prelude::*;
190use serde::{Deserialize, Serialize};
191use std::collections::HashMap;
192use std::io::{BufRead, Write};
193use thiserror::Error;
194use traits::*;
195
196// Re-export commonly used types and traits for easier access
197pub use helios_fhir::FhirVersion;
198pub use traits::{BundleTrait, ResourceTrait, ViewDefinitionTrait};
199
200/// Multi-version ViewDefinition container supporting version-agnostic operations.
201///
202/// This enum provides a unified interface for working with ViewDefinition resources
203/// across different FHIR specification versions. It enables applications to handle
204/// multiple FHIR versions simultaneously while maintaining type safety.
205///
206/// # Supported Versions
207///
208/// - **R4**: FHIR 4.0.1 ViewDefinition (normative)
209/// - **R4B**: FHIR 4.3.0 ViewDefinition (ballot)
210/// - **R5**: FHIR 5.0.0 ViewDefinition (ballot)
211/// - **R6**: FHIR 6.0.0 ViewDefinition (draft)
212///
213/// # Examples
214///
215/// ```rust
216/// use helios_sof::{SofViewDefinition, ContentType};
217/// # #[cfg(feature = "R4")]
218/// use helios_fhir::r4::ViewDefinition;
219///
220/// # #[cfg(feature = "R4")]
221/// # {
222/// // Parse from JSON
223/// let json = r#"{
224/// "resourceType": "ViewDefinition",
225/// "resource": "Patient",
226/// "select": [{
227/// "column": [{
228/// "name": "id",
229/// "path": "id"
230/// }]
231/// }]
232/// }"#;
233///
234/// let view_def: ViewDefinition = serde_json::from_str(json)?;
235/// let sof_view = SofViewDefinition::R4(view_def);
236///
237/// // Check version
238/// assert_eq!(sof_view.version(), helios_fhir::FhirVersion::R4);
239/// # }
240/// # Ok::<(), Box<dyn std::error::Error>>(())
241/// ```
242#[derive(Debug, Clone)]
243pub enum SofViewDefinition {
244 #[cfg(feature = "R4")]
245 R4(helios_fhir::r4::ViewDefinition),
246 #[cfg(feature = "R4B")]
247 R4B(helios_fhir::r4b::ViewDefinition),
248 #[cfg(feature = "R5")]
249 R5(helios_fhir::r5::ViewDefinition),
250 #[cfg(feature = "R6")]
251 R6(helios_fhir::r6::ViewDefinition),
252}
253
254impl SofViewDefinition {
255 /// Returns the FHIR specification version of this ViewDefinition.
256 ///
257 /// This method provides version detection for multi-version applications,
258 /// enabling version-specific processing logic and compatibility checks.
259 ///
260 /// # Returns
261 ///
262 /// The `FhirVersion` enum variant corresponding to this ViewDefinition's specification.
263 ///
264 /// # Examples
265 ///
266 /// ```rust
267 /// use helios_sof::SofViewDefinition;
268 /// use helios_fhir::FhirVersion;
269 ///
270 /// # #[cfg(feature = "R5")]
271 /// # {
272 /// # let view_def = helios_fhir::r5::ViewDefinition::default();
273 /// let sof_view = SofViewDefinition::R5(view_def);
274 /// assert_eq!(sof_view.version(), helios_fhir::FhirVersion::R5);
275 /// # }
276 /// ```
277 pub fn version(&self) -> helios_fhir::FhirVersion {
278 match self {
279 #[cfg(feature = "R4")]
280 SofViewDefinition::R4(_) => helios_fhir::FhirVersion::R4,
281 #[cfg(feature = "R4B")]
282 SofViewDefinition::R4B(_) => helios_fhir::FhirVersion::R4B,
283 #[cfg(feature = "R5")]
284 SofViewDefinition::R5(_) => helios_fhir::FhirVersion::R5,
285 #[cfg(feature = "R6")]
286 SofViewDefinition::R6(_) => helios_fhir::FhirVersion::R6,
287 }
288 }
289}
290
291/// Multi-version Bundle container supporting version-agnostic operations.
292///
293/// This enum provides a unified interface for working with FHIR Bundle resources
294/// across different FHIR specification versions. Bundles contain the actual FHIR
295/// resources that will be processed by ViewDefinitions.
296///
297/// # Supported Versions
298///
299/// - **R4**: FHIR 4.0.1 Bundle (normative)
300/// - **R4B**: FHIR 4.3.0 Bundle (ballot)
301/// - **R5**: FHIR 5.0.0 Bundle (ballot)
302/// - **R6**: FHIR 6.0.0 Bundle (draft)
303///
304/// # Examples
305///
306/// ```rust
307/// # #[cfg(not(target_os = "windows"))]
308/// # {
309/// use helios_sof::SofBundle;
310/// # #[cfg(feature = "R4")]
311/// use helios_fhir::r4::Bundle;
312///
313/// # #[cfg(feature = "R4")]
314/// # {
315/// // Parse from JSON
316/// let json = r#"{
317/// "resourceType": "Bundle",
318/// "type": "collection",
319/// "entry": [{
320/// "resource": {
321/// "resourceType": "Patient",
322/// "id": "example"
323/// }
324/// }]
325/// }"#;
326///
327/// let bundle: Bundle = serde_json::from_str(json)?;
328/// let sof_bundle = SofBundle::R4(bundle);
329///
330/// // Check version compatibility
331/// assert_eq!(sof_bundle.version(), helios_fhir::FhirVersion::R4);
332/// # }
333/// # }
334/// # Ok::<(), Box<dyn std::error::Error>>(())
335/// ```
336#[derive(Debug, Clone)]
337pub enum SofBundle {
338 #[cfg(feature = "R4")]
339 R4(helios_fhir::r4::Bundle),
340 #[cfg(feature = "R4B")]
341 R4B(helios_fhir::r4b::Bundle),
342 #[cfg(feature = "R5")]
343 R5(helios_fhir::r5::Bundle),
344 #[cfg(feature = "R6")]
345 R6(helios_fhir::r6::Bundle),
346}
347
348impl SofBundle {
349 /// Returns the FHIR specification version of this Bundle.
350 ///
351 /// This method provides version detection for multi-version applications,
352 /// ensuring that ViewDefinitions and Bundles use compatible FHIR versions.
353 ///
354 /// # Returns
355 ///
356 /// The `FhirVersion` enum variant corresponding to this Bundle's specification.
357 ///
358 /// # Examples
359 ///
360 /// ```rust
361 /// use helios_sof::SofBundle;
362 /// use helios_fhir::FhirVersion;
363 ///
364 /// # #[cfg(feature = "R4")]
365 /// # {
366 /// # let bundle = helios_fhir::r4::Bundle::default();
367 /// let sof_bundle = SofBundle::R4(bundle);
368 /// assert_eq!(sof_bundle.version(), helios_fhir::FhirVersion::R4);
369 /// # }
370 /// ```
371 pub fn version(&self) -> helios_fhir::FhirVersion {
372 match self {
373 #[cfg(feature = "R4")]
374 SofBundle::R4(_) => helios_fhir::FhirVersion::R4,
375 #[cfg(feature = "R4B")]
376 SofBundle::R4B(_) => helios_fhir::FhirVersion::R4B,
377 #[cfg(feature = "R5")]
378 SofBundle::R5(_) => helios_fhir::FhirVersion::R5,
379 #[cfg(feature = "R6")]
380 SofBundle::R6(_) => helios_fhir::FhirVersion::R6,
381 }
382 }
383}
384
385/// Multi-version CapabilityStatement container supporting version-agnostic operations.
386///
387/// This enum provides a unified interface for working with CapabilityStatement resources
388/// across different FHIR specification versions. It enables applications to handle
389/// multiple FHIR versions simultaneously while maintaining type safety.
390///
391/// # Supported Versions
392///
393/// - **R4**: FHIR 4.0.1 CapabilityStatement (normative)
394/// - **R4B**: FHIR 4.3.0 CapabilityStatement (ballot)
395/// - **R5**: FHIR 5.0.0 CapabilityStatement (ballot)
396/// - **R6**: FHIR 6.0.0 CapabilityStatement (draft)
397#[derive(Debug, Clone, Serialize, Deserialize)]
398#[serde(untagged)]
399pub enum SofCapabilityStatement {
400 #[cfg(feature = "R4")]
401 R4(helios_fhir::r4::CapabilityStatement),
402 #[cfg(feature = "R4B")]
403 R4B(helios_fhir::r4b::CapabilityStatement),
404 #[cfg(feature = "R5")]
405 R5(helios_fhir::r5::CapabilityStatement),
406 #[cfg(feature = "R6")]
407 R6(helios_fhir::r6::CapabilityStatement),
408}
409
410impl SofCapabilityStatement {
411 /// Returns the FHIR specification version of this CapabilityStatement.
412 pub fn version(&self) -> helios_fhir::FhirVersion {
413 match self {
414 #[cfg(feature = "R4")]
415 SofCapabilityStatement::R4(_) => helios_fhir::FhirVersion::R4,
416 #[cfg(feature = "R4B")]
417 SofCapabilityStatement::R4B(_) => helios_fhir::FhirVersion::R4B,
418 #[cfg(feature = "R5")]
419 SofCapabilityStatement::R5(_) => helios_fhir::FhirVersion::R5,
420 #[cfg(feature = "R6")]
421 SofCapabilityStatement::R6(_) => helios_fhir::FhirVersion::R6,
422 }
423 }
424}
425
426/// Type alias for the version-independent Parameters container.
427///
428/// This alias provides backward compatibility while using the unified
429/// VersionIndependentParameters from the helios_fhir crate.
430pub type SofParameters = helios_fhir::VersionIndependentParameters;
431
432/// Comprehensive error type for SQL-on-FHIR operations.
433///
434/// This enum covers all possible error conditions that can occur during
435/// ViewDefinition processing, from validation failures to output formatting issues.
436/// Each variant provides specific context about the error to aid in debugging.
437///
438/// # Error Categories
439///
440/// - **Validation**: ViewDefinition structure and logic validation
441/// - **Evaluation**: FHIRPath expression evaluation failures
442/// - **I/O**: File and serialization operations
443/// - **Format**: Output format conversion issues
444///
445/// # Examples
446///
447/// ```rust,no_run
448/// use helios_sof::{SofError, SofViewDefinition, SofBundle, ContentType, run_view_definition};
449///
450/// # let view = SofViewDefinition::R4(helios_fhir::r4::ViewDefinition::default());
451/// # let bundle = SofBundle::R4(helios_fhir::r4::Bundle::default());
452/// # let content_type = ContentType::Json;
453/// match run_view_definition(view, bundle, content_type) {
454/// Ok(output) => {
455/// println!("Transformation successful");
456/// },
457/// Err(SofError::InvalidViewDefinition(msg)) => {
458/// eprintln!("ViewDefinition validation failed: {}", msg);
459/// },
460/// Err(SofError::FhirPathError(msg)) => {
461/// eprintln!("FHIRPath evaluation error: {}", msg);
462/// },
463/// Err(SofError::UnsupportedContentType(format)) => {
464/// eprintln!("Unsupported output format: {}", format);
465/// },
466/// Err(e) => {
467/// eprintln!("Other error: {}", e);
468/// }
469/// }
470/// ```
471#[derive(Debug, Error)]
472pub enum SofError {
473 /// ViewDefinition structure or logic validation failed.
474 ///
475 /// This error occurs when a ViewDefinition contains invalid or inconsistent
476 /// configuration, such as missing required fields, invalid FHIRPath expressions,
477 /// or incompatible select/unionAll structures.
478 #[error("Invalid ViewDefinition: {0}")]
479 InvalidViewDefinition(String),
480
481 /// FHIRPath expression evaluation failed.
482 ///
483 /// This error occurs when a FHIRPath expression in a ViewDefinition cannot
484 /// be evaluated, either due to syntax errors or runtime evaluation issues.
485 #[error("FHIRPath evaluation error: {0}")]
486 FhirPathError(String),
487
488 /// JSON serialization/deserialization failed.
489 ///
490 /// This error occurs when parsing input JSON or serializing output data fails,
491 /// typically due to malformed JSON or incompatible data structures.
492 #[error("Serialization error: {0}")]
493 SerializationError(#[from] serde_json::Error),
494
495 /// CSV processing failed.
496 ///
497 /// This error occurs during CSV output generation, such as when writing
498 /// headers or data rows to the CSV format.
499 #[error("CSV error: {0}")]
500 CsvError(#[from] csv::Error),
501
502 /// File I/O operation failed.
503 ///
504 /// This error occurs when reading input files or writing output files fails,
505 /// typically due to permission issues or missing files.
506 #[error("IO error: {0}")]
507 IoError(#[from] std::io::Error),
508
509 /// Unsupported output content type requested.
510 ///
511 /// This error occurs when an invalid or unimplemented content type is
512 /// specified for output formatting.
513 #[error("Unsupported content type: {0}")]
514 UnsupportedContentType(String),
515
516 /// CSV writer internal error.
517 ///
518 /// This error occurs when the CSV writer encounters an internal issue
519 /// that prevents successful output generation.
520 #[error("CSV writer error: {0}")]
521 CsvWriterError(String),
522
523 /// Invalid source parameter value.
524 ///
525 /// This error occurs when the source parameter contains an invalid URL or path.
526 #[error("Invalid source: {0}")]
527 InvalidSource(String),
528
529 /// Source not found.
530 ///
531 /// This error occurs when the specified source file or URL cannot be found.
532 #[error("Source not found: {0}")]
533 SourceNotFound(String),
534
535 /// Failed to fetch data from source.
536 ///
537 /// This error occurs when fetching data from a remote source fails.
538 #[error("Failed to fetch source: {0}")]
539 SourceFetchError(String),
540
541 /// Failed to read source data.
542 ///
543 /// This error occurs when reading data from the source fails.
544 #[error("Failed to read source: {0}")]
545 SourceReadError(String),
546
547 /// Invalid content in source.
548 ///
549 /// This error occurs when the source content is not valid FHIR data.
550 #[error("Invalid source content: {0}")]
551 InvalidSourceContent(String),
552
553 /// Unsupported source protocol.
554 ///
555 /// This error occurs when the source URL uses an unsupported protocol.
556 #[error("Unsupported source protocol: {0}")]
557 UnsupportedSourceProtocol(String),
558
559 /// Parquet conversion error.
560 ///
561 /// This error occurs when converting data to Parquet format fails.
562 #[error("Parquet conversion error: {0}")]
563 ParquetConversionError(String),
564}
565
566/// Supported output content types for ViewDefinition transformations.
567///
568/// This enum defines the available output formats for transformed FHIR data.
569/// Each format has specific characteristics and use cases for different
570/// integration scenarios.
571///
572/// # Format Descriptions
573///
574/// - **CSV**: Comma-separated values without headers
575/// - **CSV with Headers**: Comma-separated values with column headers
576/// - **JSON**: Pretty-printed JSON array of objects
577/// - **NDJSON**: Newline-delimited JSON (one object per line)
578/// - **Parquet**: Apache Parquet columnar format (planned)
579///
580/// # Examples
581///
582/// ```rust
583/// use helios_sof::ContentType;
584///
585/// // Parse from string
586/// let csv_type = ContentType::from_string("text/csv")?;
587/// assert_eq!(csv_type, ContentType::CsvWithHeader); // Default includes headers
588///
589/// let json_type = ContentType::from_string("application/json")?;
590/// assert_eq!(json_type, ContentType::Json);
591///
592/// // CSV without headers
593/// let csv_no_headers = ContentType::from_string("text/csv;header=false")?;
594/// assert_eq!(csv_no_headers, ContentType::Csv);
595/// # Ok::<(), helios_sof::SofError>(())
596/// ```
597#[derive(Debug, Clone, Copy, PartialEq, Eq)]
598pub enum ContentType {
599 /// Comma-separated values format without headers
600 Csv,
601 /// Comma-separated values format with column headers
602 CsvWithHeader,
603 /// Pretty-printed JSON array format
604 Json,
605 /// Newline-delimited JSON format (NDJSON)
606 NdJson,
607 /// Apache Parquet columnar format (not yet implemented)
608 Parquet,
609}
610
611impl ContentType {
612 /// Parse a content type from its MIME type string representation.
613 ///
614 /// This method converts standard MIME type strings to the corresponding
615 /// ContentType enum variants. It supports the SQL-on-FHIR specification's
616 /// recommended content types.
617 ///
618 /// # Supported MIME Types
619 ///
620 /// - `"text/csv"` → [`ContentType::Csv`]
621 /// - `"text/csv"` → [`ContentType::CsvWithHeader`] (default: headers included)
622 /// - `"text/csv;header=true"` → [`ContentType::CsvWithHeader`]
623 /// - `"text/csv;header=false"` → [`ContentType::Csv`]
624 /// - `"application/json"` → [`ContentType::Json`]
625 /// - `"application/ndjson"` → [`ContentType::NdJson`]
626 /// - `"application/x-ndjson"` → [`ContentType::NdJson`]
627 /// - `"application/parquet"` → [`ContentType::Parquet`]
628 ///
629 /// # Arguments
630 ///
631 /// * `s` - The MIME type string to parse
632 ///
633 /// # Returns
634 ///
635 /// * `Ok(ContentType)` - Successfully parsed content type
636 /// * `Err(SofError::UnsupportedContentType)` - Unknown or unsupported MIME type
637 ///
638 /// # Examples
639 ///
640 /// ```rust
641 /// use helios_sof::ContentType;
642 ///
643 /// // Shortened format names
644 /// let csv = ContentType::from_string("csv")?;
645 /// assert_eq!(csv, ContentType::CsvWithHeader);
646 ///
647 /// let json = ContentType::from_string("json")?;
648 /// assert_eq!(json, ContentType::Json);
649 ///
650 /// let ndjson = ContentType::from_string("ndjson")?;
651 /// assert_eq!(ndjson, ContentType::NdJson);
652 ///
653 /// // Full MIME types still supported
654 /// let csv_mime = ContentType::from_string("text/csv")?;
655 /// assert_eq!(csv_mime, ContentType::CsvWithHeader);
656 ///
657 /// // CSV with headers explicitly
658 /// let csv_headers = ContentType::from_string("text/csv;header=true")?;
659 /// assert_eq!(csv_headers, ContentType::CsvWithHeader);
660 ///
661 /// // CSV without headers
662 /// let csv_no_headers = ContentType::from_string("text/csv;header=false")?;
663 /// assert_eq!(csv_no_headers, ContentType::Csv);
664 ///
665 /// // JSON format
666 /// let json_mime = ContentType::from_string("application/json")?;
667 /// assert_eq!(json_mime, ContentType::Json);
668 ///
669 /// // Error for unsupported type
670 /// assert!(ContentType::from_string("text/plain").is_err());
671 /// # Ok::<(), helios_sof::SofError>(())
672 /// ```
673 pub fn from_string(s: &str) -> Result<Self, SofError> {
674 match s {
675 // Shortened format names
676 "csv" => Ok(ContentType::CsvWithHeader),
677 "json" => Ok(ContentType::Json),
678 "ndjson" => Ok(ContentType::NdJson),
679 "parquet" => Ok(ContentType::Parquet),
680 // Full MIME types (for Accept header compatibility)
681 "text/csv;header=false" => Ok(ContentType::Csv),
682 "text/csv" | "text/csv;header=true" => Ok(ContentType::CsvWithHeader),
683 "application/json" => Ok(ContentType::Json),
684 "application/ndjson" | "application/x-ndjson" => Ok(ContentType::NdJson),
685 "application/parquet" => Ok(ContentType::Parquet),
686 _ => Err(SofError::UnsupportedContentType(s.to_string())),
687 }
688 }
689}
690
691/// Returns the FHIR version string for the newest enabled version.
692///
693/// This function provides the version string that should be used in CapabilityStatements
694/// and other FHIR resources that need to specify their version.
695pub fn get_fhir_version_string() -> &'static str {
696 let newest_version = get_newest_enabled_fhir_version();
697
698 match newest_version {
699 #[cfg(feature = "R4")]
700 helios_fhir::FhirVersion::R4 => "4.0.1",
701 #[cfg(feature = "R4B")]
702 helios_fhir::FhirVersion::R4B => "4.3.0",
703 #[cfg(feature = "R5")]
704 helios_fhir::FhirVersion::R5 => "5.0.0",
705 #[cfg(feature = "R6")]
706 helios_fhir::FhirVersion::R6 => "6.0.0",
707 }
708}
709
710/// Returns the newest FHIR version that is enabled at compile time.
711///
712/// This function uses compile-time feature detection to determine which FHIR
713/// version should be used when multiple versions are enabled. The priority order
714/// is: R6 > R5 > R4B > R4, where newer versions take precedence.
715///
716/// # Examples
717///
718/// ```rust
719/// use helios_sof::{get_newest_enabled_fhir_version, FhirVersion};
720///
721/// # #[cfg(any(feature = "R4", feature = "R4B", feature = "R5", feature = "R6"))]
722/// # {
723/// let version = get_newest_enabled_fhir_version();
724/// // If R5 and R4 are both enabled, this returns R5
725/// # }
726/// ```
727///
728/// # Panics
729///
730/// This function will panic at compile time if no FHIR version features are enabled.
731pub fn get_newest_enabled_fhir_version() -> helios_fhir::FhirVersion {
732 #[cfg(feature = "R6")]
733 return helios_fhir::FhirVersion::R6;
734
735 #[cfg(all(feature = "R5", not(feature = "R6")))]
736 return helios_fhir::FhirVersion::R5;
737
738 #[cfg(all(feature = "R4B", not(feature = "R5"), not(feature = "R6")))]
739 return helios_fhir::FhirVersion::R4B;
740
741 #[cfg(all(
742 feature = "R4",
743 not(feature = "R4B"),
744 not(feature = "R5"),
745 not(feature = "R6")
746 ))]
747 return helios_fhir::FhirVersion::R4;
748
749 #[cfg(not(any(feature = "R4", feature = "R4B", feature = "R5", feature = "R6")))]
750 panic!("At least one FHIR version feature must be enabled");
751}
752
753/// A single row of processed tabular data from ViewDefinition transformation.
754///
755/// This struct represents one row in the output table, containing values for
756/// each column defined in the ViewDefinition. Values are stored as optional
757/// JSON values to handle nullable fields and diverse FHIR data types.
758///
759/// # Structure
760///
761/// Each `ProcessedRow` contains a vector of optional JSON values, where:
762/// - `Some(value)` represents a non-null column value
763/// - `None` represents a null/missing column value
764/// - The order matches the column order in [`ProcessedResult::columns`]
765///
766/// # Examples
767///
768/// ```rust
769/// use helios_sof::ProcessedRow;
770/// use serde_json::Value;
771///
772/// let row = ProcessedRow {
773/// values: vec![
774/// Some(Value::String("patient-123".to_string())),
775/// Some(Value::String("Doe".to_string())),
776/// None, // Missing birth date
777/// Some(Value::Bool(true)),
778/// ]
779/// };
780/// ```
781#[derive(Debug, Clone, Serialize, Deserialize)]
782pub struct ProcessedRow {
783 /// Column values for this row, ordered according to ProcessedResult::columns
784 pub values: Vec<Option<serde_json::Value>>,
785}
786
787/// Complete result of ViewDefinition transformation containing columns and data rows.
788///
789/// This struct represents the tabular output from processing a ViewDefinition
790/// against a Bundle of FHIR resources. It contains both the column definitions
791/// and the actual data rows in a format ready for serialization to various
792/// output formats.
793///
794/// # Structure
795///
796/// - [`columns`](Self::columns): Ordered list of column names from the ViewDefinition
797/// - [`rows`](Self::rows): Data rows where each row contains values in column order
798///
799/// # Examples
800///
801/// ```rust
802/// use helios_sof::{ProcessedResult, ProcessedRow};
803/// use serde_json::Value;
804///
805/// let result = ProcessedResult {
806/// columns: vec![
807/// "patient_id".to_string(),
808/// "family_name".to_string(),
809/// "given_name".to_string(),
810/// ],
811/// rows: vec![
812/// ProcessedRow {
813/// values: vec![
814/// Some(Value::String("patient-1".to_string())),
815/// Some(Value::String("Smith".to_string())),
816/// Some(Value::String("John".to_string())),
817/// ]
818/// },
819/// ProcessedRow {
820/// values: vec![
821/// Some(Value::String("patient-2".to_string())),
822/// Some(Value::String("Doe".to_string())),
823/// None, // Missing given name
824/// ]
825/// },
826/// ]
827/// };
828///
829/// assert_eq!(result.columns.len(), 3);
830/// assert_eq!(result.rows.len(), 2);
831/// ```
832#[derive(Debug, Clone, Serialize, Deserialize)]
833pub struct ProcessedResult {
834 /// Ordered list of column names as defined in the ViewDefinition
835 pub columns: Vec<String>,
836 /// Data rows containing values for each column
837 pub rows: Vec<ProcessedRow>,
838}
839
840/// Execute a SQL-on-FHIR ViewDefinition transformation on a FHIR Bundle.
841///
842/// This is the main entry point for SQL-on-FHIR transformations. It processes
843/// a ViewDefinition against a Bundle of FHIR resources and produces output in
844/// the specified format. The function handles version compatibility, validation,
845/// FHIRPath evaluation, and output formatting.
846///
847/// # Arguments
848///
849/// * `view_definition` - The ViewDefinition containing transformation logic
850/// * `bundle` - The Bundle containing FHIR resources to process
851/// * `content_type` - The desired output format
852///
853/// # Returns
854///
855/// * `Ok(Vec<u8>)` - Formatted output bytes ready for writing to file or stdout
856/// * `Err(SofError)` - Detailed error information about what went wrong
857///
858/// # Validation
859///
860/// The function performs comprehensive validation:
861/// - FHIR version compatibility between ViewDefinition and Bundle
862/// - ViewDefinition structure and logic validation
863/// - FHIRPath expression syntax and evaluation
864/// - Output format compatibility
865///
866/// # Examples
867///
868/// ```rust
869/// use helios_sof::{SofViewDefinition, SofBundle, ContentType, run_view_definition};
870///
871/// # #[cfg(feature = "R4")]
872/// # {
873/// // Create a simple ViewDefinition
874/// let view_json = serde_json::json!({
875/// "resourceType": "ViewDefinition",
876/// "status": "active",
877/// "resource": "Patient",
878/// "select": [{
879/// "column": [{
880/// "name": "id",
881/// "path": "id"
882/// }]
883/// }]
884/// });
885/// let view_def: helios_fhir::r4::ViewDefinition = serde_json::from_value(view_json)?;
886///
887/// // Create a simple Bundle
888/// let bundle_json = serde_json::json!({
889/// "resourceType": "Bundle",
890/// "type": "collection",
891/// "entry": []
892/// });
893/// let bundle: helios_fhir::r4::Bundle = serde_json::from_value(bundle_json)?;
894///
895/// let sof_view = SofViewDefinition::R4(view_def);
896/// let sof_bundle = SofBundle::R4(bundle);
897///
898/// // Generate CSV with headers
899/// let csv_output = run_view_definition(
900/// sof_view,
901/// sof_bundle,
902/// ContentType::CsvWithHeader
903/// )?;
904///
905/// // Write to file or stdout
906/// std::fs::write("output.csv", csv_output)?;
907/// # }
908/// # Ok::<(), Box<dyn std::error::Error>>(())
909/// ```
910///
911/// # Error Handling
912///
913/// Common error scenarios:
914///
915/// ```rust,no_run
916/// use helios_sof::{SofError, SofViewDefinition, SofBundle, ContentType, run_view_definition};
917///
918/// # let view = SofViewDefinition::R4(helios_fhir::r4::ViewDefinition::default());
919/// # let bundle = SofBundle::R4(helios_fhir::r4::Bundle::default());
920/// # let content_type = ContentType::Json;
921/// match run_view_definition(view, bundle, content_type) {
922/// Ok(output) => {
923/// println!("Success: {} bytes generated", output.len());
924/// },
925/// Err(SofError::InvalidViewDefinition(msg)) => {
926/// eprintln!("ViewDefinition error: {}", msg);
927/// },
928/// Err(SofError::FhirPathError(msg)) => {
929/// eprintln!("FHIRPath error: {}", msg);
930/// },
931/// Err(e) => {
932/// eprintln!("Other error: {}", e);
933/// }
934/// }
935/// ```
936pub fn run_view_definition(
937 view_definition: SofViewDefinition,
938 bundle: SofBundle,
939 content_type: ContentType,
940) -> Result<Vec<u8>, SofError> {
941 run_view_definition_with_options(view_definition, bundle, content_type, RunOptions::default())
942}
943
944/// Configuration options for Parquet file generation.
945#[derive(Debug, Clone)]
946pub struct ParquetOptions {
947 /// Target row group size in MB (64-1024)
948 pub row_group_size_mb: u32,
949 /// Target page size in KB (64-8192)
950 pub page_size_kb: u32,
951 /// Compression algorithm (none, snappy, gzip, lz4, brotli, zstd)
952 pub compression: String,
953 /// Maximum file size in MB (splits output when exceeded)
954 pub max_file_size_mb: Option<u32>,
955}
956
957impl Default for ParquetOptions {
958 fn default() -> Self {
959 Self {
960 row_group_size_mb: 256,
961 page_size_kb: 1024,
962 compression: "snappy".to_string(),
963 max_file_size_mb: None,
964 }
965 }
966}
967
968/// Options for filtering and controlling ViewDefinition execution
969#[derive(Debug, Clone, Default)]
970pub struct RunOptions {
971 /// Filter resources modified after this time
972 pub since: Option<DateTime<Utc>>,
973 /// Limit the number of results
974 pub limit: Option<usize>,
975 /// Page number for pagination (1-based)
976 pub page: Option<usize>,
977 /// Parquet-specific configuration options
978 pub parquet_options: Option<ParquetOptions>,
979}
980
981// =============================================================================
982// Streaming/Chunked Processing Types
983// =============================================================================
984
985/// Configuration for chunked NDJSON processing.
986///
987/// Controls how NDJSON files are read and processed in chunks to reduce
988/// memory usage when handling large files.
989///
990/// # Examples
991///
992/// ```rust
993/// use helios_sof::ChunkConfig;
994///
995/// // Default configuration (1000 resources per chunk)
996/// let config = ChunkConfig::default();
997///
998/// // Custom configuration for memory-constrained environments
999/// let config = ChunkConfig {
1000/// chunk_size: 100,
1001/// skip_invalid_lines: true,
1002/// };
1003/// ```
1004#[derive(Debug, Clone)]
1005pub struct ChunkConfig {
1006 /// Number of resources to process per chunk.
1007 /// Default: 1000 (approximately 10MB memory usage per chunk)
1008 pub chunk_size: usize,
1009 /// If true, skip lines that fail to parse as valid JSON.
1010 /// If false (default), return an error on the first invalid line.
1011 pub skip_invalid_lines: bool,
1012}
1013
1014impl Default for ChunkConfig {
1015 fn default() -> Self {
1016 Self {
1017 chunk_size: 1000,
1018 skip_invalid_lines: false,
1019 }
1020 }
1021}
1022
1023/// A chunk of parsed FHIR resources from an NDJSON file.
1024///
1025/// Represents a batch of resources that have been read and parsed,
1026/// ready for processing through a ViewDefinition.
1027#[derive(Debug)]
1028pub struct ResourceChunk {
1029 /// The parsed FHIR resources in this chunk
1030 pub resources: Vec<serde_json::Value>,
1031 /// Zero-based index of this chunk (0, 1, 2, ...)
1032 pub chunk_index: usize,
1033 /// True if this is the last chunk in the file
1034 pub is_last: bool,
1035}
1036
1037/// Result from processing a single chunk of resources.
1038///
1039/// Contains the output rows generated from processing one chunk,
1040/// along with metadata about the chunk position.
1041#[derive(Debug, Clone)]
1042pub struct ChunkedResult {
1043 /// Column names (same for all chunks)
1044 pub columns: Vec<String>,
1045 /// Processed rows from this chunk
1046 pub rows: Vec<ProcessedRow>,
1047 /// Zero-based index of this chunk
1048 pub chunk_index: usize,
1049 /// True if this is the last chunk
1050 pub is_last: bool,
1051 /// Number of resources that were in the input chunk
1052 pub resources_in_chunk: usize,
1053}
1054
1055/// Statistics from chunked processing.
1056///
1057/// Provides summary information about a completed chunked processing run.
1058#[derive(Debug, Clone, Default)]
1059pub struct ProcessingStats {
1060 /// Total number of lines read from the NDJSON file
1061 pub total_lines_read: usize,
1062 /// Number of FHIR resources successfully processed
1063 pub resources_processed: usize,
1064 /// Number of output rows generated
1065 pub output_rows: usize,
1066 /// Number of lines skipped due to parse errors (when skip_invalid_lines is true)
1067 pub skipped_lines: usize,
1068 /// Number of chunks processed
1069 pub chunks_processed: usize,
1070}
1071
1072/// Reads NDJSON files in chunks, yielding parsed resources.
1073///
1074/// This iterator reads an NDJSON file line by line, collecting resources
1075/// into chunks of the configured size. Each iteration yields a `ResourceChunk`
1076/// containing up to `chunk_size` parsed FHIR resources.
1077///
1078/// # Examples
1079///
1080/// ```rust,no_run
1081/// use helios_sof::{NdjsonChunkReader, ChunkConfig};
1082/// use std::io::BufReader;
1083/// use std::fs::File;
1084///
1085/// let file = File::open("patients.ndjson").unwrap();
1086/// let reader = BufReader::new(file);
1087/// let config = ChunkConfig::default();
1088///
1089/// let mut chunk_reader = NdjsonChunkReader::new(reader, config);
1090///
1091/// while let Some(result) = chunk_reader.next() {
1092/// match result {
1093/// Ok(chunk) => {
1094/// println!("Chunk {}: {} resources", chunk.chunk_index, chunk.resources.len());
1095/// }
1096/// Err(e) => {
1097/// eprintln!("Error reading chunk: {}", e);
1098/// break;
1099/// }
1100/// }
1101/// }
1102/// ```
1103pub struct NdjsonChunkReader<R: BufRead> {
1104 reader: R,
1105 config: ChunkConfig,
1106 current_chunk: usize,
1107 finished: bool,
1108 line_buffer: String,
1109 line_number: usize,
1110 /// Resource type filter - only include resources of this type
1111 resource_type_filter: Option<String>,
1112 /// Number of lines skipped due to invalid JSON
1113 skipped_lines: usize,
1114}
1115
1116impl<R: BufRead> NdjsonChunkReader<R> {
1117 /// Create a new NDJSON chunk reader with the given configuration.
1118 pub fn new(reader: R, config: ChunkConfig) -> Self {
1119 Self {
1120 reader,
1121 config,
1122 current_chunk: 0,
1123 finished: false,
1124 line_buffer: String::new(),
1125 line_number: 0,
1126 resource_type_filter: None,
1127 skipped_lines: 0,
1128 }
1129 }
1130
1131 /// Set a resource type filter to only include resources of a specific type.
1132 ///
1133 /// This is useful when processing NDJSON files that contain multiple resource types.
1134 pub fn with_resource_type_filter(mut self, resource_type: Option<String>) -> Self {
1135 self.resource_type_filter = resource_type;
1136 self
1137 }
1138
1139 /// Get the total number of lines read so far.
1140 pub fn lines_read(&self) -> usize {
1141 self.line_number
1142 }
1143
1144 /// Get the number of lines skipped due to invalid JSON.
1145 pub fn skipped_lines(&self) -> usize {
1146 self.skipped_lines
1147 }
1148}
1149
1150impl<R: BufRead> Iterator for NdjsonChunkReader<R> {
1151 type Item = Result<ResourceChunk, SofError>;
1152
1153 fn next(&mut self) -> Option<Self::Item> {
1154 if self.finished {
1155 return None;
1156 }
1157
1158 let mut resources = Vec::with_capacity(self.config.chunk_size);
1159
1160 while resources.len() < self.config.chunk_size {
1161 self.line_buffer.clear();
1162 match self.reader.read_line(&mut self.line_buffer) {
1163 Ok(0) => {
1164 // EOF reached
1165 self.finished = true;
1166 break;
1167 }
1168 Ok(_) => {
1169 self.line_number += 1;
1170 let line = self.line_buffer.trim();
1171
1172 // Skip empty lines
1173 if line.is_empty() {
1174 continue;
1175 }
1176
1177 // Parse the JSON
1178 match serde_json::from_str::<serde_json::Value>(line) {
1179 Ok(value) => {
1180 // Apply resource type filter if set
1181 if let Some(ref filter) = self.resource_type_filter {
1182 let resource_type =
1183 value.get("resourceType").and_then(|v| v.as_str());
1184 if resource_type != Some(filter.as_str()) {
1185 continue;
1186 }
1187 }
1188 resources.push(value);
1189 }
1190 Err(e) => {
1191 if self.config.skip_invalid_lines {
1192 // Skip this line and continue
1193 self.skipped_lines += 1;
1194 continue;
1195 } else {
1196 return Some(Err(SofError::InvalidSourceContent(format!(
1197 "Invalid JSON at line {}: {}",
1198 self.line_number, e
1199 ))));
1200 }
1201 }
1202 }
1203 }
1204 Err(e) => {
1205 return Some(Err(SofError::IoError(e)));
1206 }
1207 }
1208 }
1209
1210 // If we have no resources and we're finished, don't return an empty chunk
1211 if resources.is_empty() && self.finished {
1212 return None;
1213 }
1214
1215 let chunk = ResourceChunk {
1216 resources,
1217 chunk_index: self.current_chunk,
1218 is_last: self.finished,
1219 };
1220 self.current_chunk += 1;
1221
1222 Some(Ok(chunk))
1223 }
1224}
1225
1226/// Pre-validated ViewDefinition for efficient reuse across multiple chunks.
1227///
1228/// This struct caches the validation and constant extraction from a ViewDefinition,
1229/// allowing efficient processing of multiple chunks without re-validating each time.
1230///
1231/// # Examples
1232///
1233/// ```rust,no_run
1234/// use helios_sof::{PreparedViewDefinition, SofViewDefinition, ResourceChunk};
1235///
1236/// # #[cfg(feature = "R4")]
1237/// # {
1238/// // Parse and prepare ViewDefinition once
1239/// let view_json: serde_json::Value = serde_json::from_str(r#"{
1240/// "resourceType": "ViewDefinition",
1241/// "resource": "Patient",
1242/// "select": [{"column": [{"name": "id", "path": "id"}]}]
1243/// }"#).unwrap();
1244/// let view_def: helios_fhir::r4::ViewDefinition = serde_json::from_value(view_json).unwrap();
1245/// let sof_view = SofViewDefinition::R4(view_def);
1246///
1247/// let prepared = PreparedViewDefinition::new(sof_view).unwrap();
1248///
1249/// // Process multiple chunks efficiently
1250/// // for chunk in chunk_iterator {
1251/// // let result = prepared.process_chunk(chunk)?;
1252/// // // ... handle result
1253/// // }
1254/// # }
1255/// ```
1256#[derive(Debug, Clone)]
1257pub struct PreparedViewDefinition {
1258 view_definition: SofViewDefinition,
1259 target_resource_type: String,
1260 variables: HashMap<String, EvaluationResult>,
1261 column_names: Vec<String>,
1262}
1263
1264impl PreparedViewDefinition {
1265 /// Create a new PreparedViewDefinition by validating and extracting metadata.
1266 ///
1267 /// This performs all validation upfront so that chunk processing is efficient.
1268 pub fn new(view_definition: SofViewDefinition) -> Result<Self, SofError> {
1269 // Extract target resource type and column names based on version
1270 let (target_resource_type, variables, column_names) = match &view_definition {
1271 #[cfg(feature = "R4")]
1272 SofViewDefinition::R4(vd) => {
1273 validate_view_definition(vd)?;
1274 let vars = extract_view_definition_constants(vd)?;
1275 let resource_type = vd
1276 .resource()
1277 .ok_or_else(|| {
1278 SofError::InvalidViewDefinition("Resource type is required".to_string())
1279 })?
1280 .to_string();
1281 let mut columns = Vec::new();
1282 if let Some(selects) = vd.select() {
1283 collect_all_columns(selects, &mut columns)?;
1284 }
1285 (resource_type, vars, columns)
1286 }
1287 #[cfg(feature = "R4B")]
1288 SofViewDefinition::R4B(vd) => {
1289 validate_view_definition(vd)?;
1290 let vars = extract_view_definition_constants(vd)?;
1291 let resource_type = vd
1292 .resource()
1293 .ok_or_else(|| {
1294 SofError::InvalidViewDefinition("Resource type is required".to_string())
1295 })?
1296 .to_string();
1297 let mut columns = Vec::new();
1298 if let Some(selects) = vd.select() {
1299 collect_all_columns(selects, &mut columns)?;
1300 }
1301 (resource_type, vars, columns)
1302 }
1303 #[cfg(feature = "R5")]
1304 SofViewDefinition::R5(vd) => {
1305 validate_view_definition(vd)?;
1306 let vars = extract_view_definition_constants(vd)?;
1307 let resource_type = vd
1308 .resource()
1309 .ok_or_else(|| {
1310 SofError::InvalidViewDefinition("Resource type is required".to_string())
1311 })?
1312 .to_string();
1313 let mut columns = Vec::new();
1314 if let Some(selects) = vd.select() {
1315 collect_all_columns(selects, &mut columns)?;
1316 }
1317 (resource_type, vars, columns)
1318 }
1319 #[cfg(feature = "R6")]
1320 SofViewDefinition::R6(vd) => {
1321 validate_view_definition(vd)?;
1322 let vars = extract_view_definition_constants(vd)?;
1323 let resource_type = vd
1324 .resource()
1325 .ok_or_else(|| {
1326 SofError::InvalidViewDefinition("Resource type is required".to_string())
1327 })?
1328 .to_string();
1329 let mut columns = Vec::new();
1330 if let Some(selects) = vd.select() {
1331 collect_all_columns(selects, &mut columns)?;
1332 }
1333 (resource_type, vars, columns)
1334 }
1335 };
1336
1337 Ok(Self {
1338 view_definition,
1339 target_resource_type,
1340 variables,
1341 column_names,
1342 })
1343 }
1344
1345 /// Get the column names that will be produced by this ViewDefinition.
1346 pub fn columns(&self) -> &[String] {
1347 &self.column_names
1348 }
1349
1350 /// Get the target resource type for this ViewDefinition.
1351 pub fn target_resource_type(&self) -> &str {
1352 &self.target_resource_type
1353 }
1354
1355 /// Process a chunk of resources through this ViewDefinition.
1356 ///
1357 /// Returns a `ChunkedResult` containing the rows generated from the chunk.
1358 /// Uses parallel processing via rayon for improved throughput.
1359 pub fn process_chunk(&self, chunk: ResourceChunk) -> Result<ChunkedResult, SofError> {
1360 // Process resources in parallel using rayon
1361 let results: Result<Vec<Vec<ProcessedRow>>, SofError> = chunk
1362 .resources
1363 .par_iter()
1364 .filter_map(|resource_json| {
1365 // Check resource type matches
1366 let resource_type = resource_json
1367 .get("resourceType")
1368 .and_then(|v| v.as_str())
1369 .unwrap_or("");
1370
1371 if resource_type != self.target_resource_type {
1372 None
1373 } else {
1374 // Process single resource based on version
1375 Some(self.process_single_resource(resource_json))
1376 }
1377 })
1378 .collect();
1379
1380 // Flatten results from all resources
1381 let all_rows: Vec<ProcessedRow> = results?.into_iter().flatten().collect();
1382
1383 Ok(ChunkedResult {
1384 columns: self.column_names.clone(),
1385 rows: all_rows,
1386 chunk_index: chunk.chunk_index,
1387 is_last: chunk.is_last,
1388 resources_in_chunk: chunk.resources.len(),
1389 })
1390 }
1391
1392 /// Process a single resource JSON value through the ViewDefinition.
1393 fn process_single_resource(
1394 &self,
1395 resource_json: &serde_json::Value,
1396 ) -> Result<Vec<ProcessedRow>, SofError> {
1397 match &self.view_definition {
1398 #[cfg(feature = "R4")]
1399 SofViewDefinition::R4(vd) => self.process_single_resource_generic(vd, resource_json),
1400 #[cfg(feature = "R4B")]
1401 SofViewDefinition::R4B(vd) => self.process_single_resource_generic(vd, resource_json),
1402 #[cfg(feature = "R5")]
1403 SofViewDefinition::R5(vd) => self.process_single_resource_generic(vd, resource_json),
1404 #[cfg(feature = "R6")]
1405 SofViewDefinition::R6(vd) => self.process_single_resource_generic(vd, resource_json),
1406 }
1407 }
1408
1409 fn process_single_resource_generic<VD>(
1410 &self,
1411 view_definition: &VD,
1412 resource_json: &serde_json::Value,
1413 ) -> Result<Vec<ProcessedRow>, SofError>
1414 where
1415 VD: ViewDefinitionTrait,
1416 VD::Select: ViewDefinitionSelectTrait,
1417 {
1418 // Create evaluation context from JSON by parsing into typed FhirResource
1419 let fhir_resource =
1420 parse_json_to_fhir_resource(resource_json.clone(), self.view_definition.version())?;
1421 let mut context = EvaluationContext::new(vec![fhir_resource]);
1422
1423 // Add variables to the context
1424 for (name, value) in &self.variables {
1425 context.set_variable_result(name, value.clone());
1426 }
1427
1428 // Apply where clauses
1429 if let Some(where_clauses) = view_definition.where_clauses() {
1430 for where_clause in where_clauses {
1431 let path = where_clause.path().ok_or_else(|| {
1432 SofError::InvalidViewDefinition("Where clause path is required".to_string())
1433 })?;
1434
1435 match evaluate_expression(path, &context) {
1436 Ok(result) => {
1437 if !can_be_coerced_to_boolean(&result) {
1438 return Err(SofError::InvalidViewDefinition(format!(
1439 "Where clause path '{}' returns type '{}' which cannot be used as a boolean condition.",
1440 path,
1441 result.type_name()
1442 )));
1443 }
1444 if !is_truthy(&result) {
1445 // Resource doesn't match where clause, return empty rows
1446 return Ok(Vec::new());
1447 }
1448 }
1449 Err(e) => {
1450 return Err(SofError::FhirPathError(format!(
1451 "Error evaluating where clause '{}': {}",
1452 path, e
1453 )));
1454 }
1455 }
1456 }
1457 }
1458
1459 // Generate rows
1460 let select_clauses = view_definition.select().ok_or_else(|| {
1461 SofError::InvalidViewDefinition("At least one select clause is required".to_string())
1462 })?;
1463
1464 let mut all_columns = self.column_names.clone();
1465 generate_row_combinations(&context, select_clauses, &mut all_columns, &self.variables)
1466 }
1467}
1468
1469/// Iterator that combines NDJSON reading with ViewDefinition processing.
1470///
1471/// This iterator reads chunks from an NDJSON file and processes them
1472/// through a ViewDefinition, yielding `ChunkedResult` for each chunk.
1473///
1474/// # Examples
1475///
1476/// ```rust,no_run
1477/// use helios_sof::{NdjsonChunkIterator, SofViewDefinition, ChunkConfig};
1478/// use std::io::BufReader;
1479/// use std::fs::File;
1480///
1481/// # #[cfg(feature = "R4")]
1482/// # {
1483/// // Set up ViewDefinition
1484/// let view_json: serde_json::Value = serde_json::from_str(r#"{
1485/// "resourceType": "ViewDefinition",
1486/// "resource": "Patient",
1487/// "select": [{"column": [{"name": "id", "path": "id"}]}]
1488/// }"#).unwrap();
1489/// let view_def: helios_fhir::r4::ViewDefinition = serde_json::from_value(view_json).unwrap();
1490/// let sof_view = SofViewDefinition::R4(view_def);
1491///
1492/// // Process file in chunks
1493/// let file = File::open("patients.ndjson").unwrap();
1494/// let reader = BufReader::new(file);
1495///
1496/// let iterator = NdjsonChunkIterator::new(sof_view, reader, ChunkConfig::default()).unwrap();
1497///
1498/// for result in iterator {
1499/// match result {
1500/// Ok(chunk_result) => {
1501/// println!("Chunk {}: {} rows", chunk_result.chunk_index, chunk_result.rows.len());
1502/// }
1503/// Err(e) => {
1504/// eprintln!("Error: {}", e);
1505/// break;
1506/// }
1507/// }
1508/// }
1509/// # }
1510/// ```
1511pub struct NdjsonChunkIterator<R: BufRead> {
1512 reader: NdjsonChunkReader<R>,
1513 prepared_vd: PreparedViewDefinition,
1514}
1515
1516impl<R: BufRead> NdjsonChunkIterator<R> {
1517 /// Create a new chunk iterator from a ViewDefinition and NDJSON reader.
1518 pub fn new(
1519 view_definition: SofViewDefinition,
1520 reader: R,
1521 config: ChunkConfig,
1522 ) -> Result<Self, SofError> {
1523 let prepared_vd = PreparedViewDefinition::new(view_definition)?;
1524 let resource_type = prepared_vd.target_resource_type().to_string();
1525 let chunk_reader =
1526 NdjsonChunkReader::new(reader, config).with_resource_type_filter(Some(resource_type));
1527
1528 Ok(Self {
1529 reader: chunk_reader,
1530 prepared_vd,
1531 })
1532 }
1533
1534 /// Get the column names that will be produced by this iterator.
1535 pub fn columns(&self) -> &[String] {
1536 self.prepared_vd.columns()
1537 }
1538
1539 /// Get the total number of lines read so far.
1540 pub fn lines_read(&self) -> usize {
1541 self.reader.lines_read()
1542 }
1543
1544 /// Get the number of lines skipped due to invalid JSON.
1545 pub fn skipped_lines(&self) -> usize {
1546 self.reader.skipped_lines()
1547 }
1548}
1549
1550impl<R: BufRead> Iterator for NdjsonChunkIterator<R> {
1551 type Item = Result<ChunkedResult, SofError>;
1552
1553 fn next(&mut self) -> Option<Self::Item> {
1554 match self.reader.next()? {
1555 Ok(chunk) => Some(self.prepared_vd.process_chunk(chunk)),
1556 Err(e) => Some(Err(e)),
1557 }
1558 }
1559}
1560
1561// =============================================================================
1562// Streaming Output Functions
1563// =============================================================================
1564
1565/// Write CSV header row.
1566fn write_csv_header<W: Write>(columns: &[String], writer: &mut W) -> Result<(), SofError> {
1567 let mut wtr = csv::Writer::from_writer(writer);
1568 wtr.write_record(columns)?;
1569 wtr.flush()?;
1570 Ok(())
1571}
1572
1573/// Write CSV rows from a chunk (no header).
1574fn write_csv_chunk<W: Write>(result: &ChunkedResult, writer: &mut W) -> Result<(), SofError> {
1575 let mut wtr = csv::Writer::from_writer(writer);
1576
1577 for row in &result.rows {
1578 let record: Vec<String> = row
1579 .values
1580 .iter()
1581 .map(|v| match v {
1582 Some(val) => {
1583 if let serde_json::Value::String(s) = val {
1584 s.clone()
1585 } else {
1586 serde_json::to_string(val).unwrap_or_default()
1587 }
1588 }
1589 None => String::new(),
1590 })
1591 .collect();
1592 wtr.write_record(&record)?;
1593 }
1594
1595 wtr.flush()?;
1596 Ok(())
1597}
1598
1599/// Write NDJSON rows from a chunk.
1600fn write_ndjson_chunk<W: Write>(result: &ChunkedResult, writer: &mut W) -> Result<(), SofError> {
1601 for row in &result.rows {
1602 let mut row_obj = serde_json::Map::new();
1603 for (i, column) in result.columns.iter().enumerate() {
1604 let value = row
1605 .values
1606 .get(i)
1607 .and_then(|v| v.as_ref())
1608 .cloned()
1609 .unwrap_or(serde_json::Value::Null);
1610 row_obj.insert(column.clone(), value);
1611 }
1612 let line = serde_json::to_string(&serde_json::Value::Object(row_obj))?;
1613 writer.write_all(line.as_bytes())?;
1614 writer.write_all(b"\n")?;
1615 }
1616
1617 Ok(())
1618}
1619
1620/// Process an NDJSON input stream and write output incrementally.
1621///
1622/// This is the main entry point for streaming/chunked NDJSON processing.
1623/// It reads the input in chunks, processes each chunk through the ViewDefinition,
1624/// and writes the output incrementally to the writer.
1625///
1626/// # Arguments
1627///
1628/// * `view_definition` - The ViewDefinition to execute
1629/// * `input` - A buffered reader for the NDJSON input
1630/// * `output` - A writer for the output (file, stdout, etc.)
1631/// * `content_type` - The desired output format (CSV, NDJSON, JSON)
1632/// * `config` - Configuration for chunk processing
1633///
1634/// # Returns
1635///
1636/// Statistics about the processing run, including row counts and chunk counts.
1637///
1638/// # Examples
1639///
1640/// ```rust,no_run
1641/// use helios_sof::{process_ndjson_chunked, SofViewDefinition, ContentType, ChunkConfig};
1642/// use std::io::{BufReader, BufWriter};
1643/// use std::fs::File;
1644///
1645/// # #[cfg(feature = "R4")]
1646/// # {
1647/// // Set up ViewDefinition
1648/// let view_json: serde_json::Value = serde_json::from_str(r#"{
1649/// "resourceType": "ViewDefinition",
1650/// "resource": "Patient",
1651/// "select": [{"column": [{"name": "id", "path": "id"}]}]
1652/// }"#).unwrap();
1653/// let view_def: helios_fhir::r4::ViewDefinition = serde_json::from_value(view_json).unwrap();
1654/// let sof_view = SofViewDefinition::R4(view_def);
1655///
1656/// // Process file
1657/// let input = BufReader::new(File::open("patients.ndjson").unwrap());
1658/// let mut output = BufWriter::new(File::create("output.csv").unwrap());
1659///
1660/// let stats = process_ndjson_chunked(
1661/// sof_view,
1662/// input,
1663/// &mut output,
1664/// ContentType::CsvWithHeader,
1665/// ChunkConfig::default(),
1666/// ).unwrap();
1667///
1668/// println!("Processed {} resources, {} output rows",
1669/// stats.resources_processed, stats.output_rows);
1670/// # }
1671/// ```
1672///
1673/// # Errors
1674///
1675/// Returns an error if:
1676/// - The ViewDefinition is invalid
1677/// - The input contains invalid JSON (when `skip_invalid_lines` is false)
1678/// - Writing to the output fails
1679/// - Parquet format is requested (not supported for streaming)
1680pub fn process_ndjson_chunked<R: BufRead, W: Write>(
1681 view_definition: SofViewDefinition,
1682 input: R,
1683 mut output: W,
1684 content_type: ContentType,
1685 config: ChunkConfig,
1686) -> Result<ProcessingStats, SofError> {
1687 // Validate content type supports streaming
1688 if content_type == ContentType::Parquet {
1689 return Err(SofError::UnsupportedContentType(
1690 "Parquet output is not supported for streaming. Use batch processing instead."
1691 .to_string(),
1692 ));
1693 }
1694
1695 let mut iterator = NdjsonChunkIterator::new(view_definition, input, config)?;
1696 let columns = iterator.columns().to_vec();
1697
1698 let mut stats = ProcessingStats::default();
1699 let mut is_first_chunk = true;
1700
1701 // Write header if needed
1702 if content_type == ContentType::CsvWithHeader {
1703 write_csv_header(&columns, &mut output)?;
1704 }
1705
1706 // For JSON output, we need special handling to create a valid array
1707 if content_type == ContentType::Json {
1708 output.write_all(b"[\n")?;
1709 }
1710
1711 for result in iterator.by_ref() {
1712 let chunk_result = result?;
1713
1714 stats.resources_processed += chunk_result.resources_in_chunk;
1715 stats.output_rows += chunk_result.rows.len();
1716 stats.chunks_processed += 1;
1717
1718 // Write chunk output
1719 match content_type {
1720 ContentType::Csv | ContentType::CsvWithHeader => {
1721 write_csv_chunk(&chunk_result, &mut output)?;
1722 }
1723 ContentType::NdJson => {
1724 write_ndjson_chunk(&chunk_result, &mut output)?;
1725 }
1726 ContentType::Json => {
1727 // Write JSON rows with proper comma handling
1728 for (i, row) in chunk_result.rows.iter().enumerate() {
1729 if !is_first_chunk || i > 0 {
1730 output.write_all(b",\n")?;
1731 }
1732
1733 let mut row_obj = serde_json::Map::new();
1734 for (j, column) in chunk_result.columns.iter().enumerate() {
1735 let value = row
1736 .values
1737 .get(j)
1738 .and_then(|v| v.as_ref())
1739 .cloned()
1740 .unwrap_or(serde_json::Value::Null);
1741 row_obj.insert(column.clone(), value);
1742 }
1743 let json = serde_json::to_string_pretty(&serde_json::Value::Object(row_obj))?;
1744 output.write_all(json.as_bytes())?;
1745 }
1746 }
1747 ContentType::Parquet => unreachable!(), // Already checked above
1748 }
1749
1750 output.flush()?;
1751 is_first_chunk = false;
1752 }
1753
1754 // Close JSON array if needed
1755 if content_type == ContentType::Json {
1756 output.write_all(b"\n]")?;
1757 }
1758
1759 output.flush()?;
1760
1761 // Update stats with line/skip counts from the iterator
1762 stats.total_lines_read = iterator.lines_read();
1763 stats.skipped_lines = iterator.skipped_lines();
1764
1765 Ok(stats)
1766}
1767
1768/// Create an iterator for chunked NDJSON processing.
1769///
1770/// This is a convenience function that creates an `NdjsonChunkIterator`.
1771/// Use this when you want more control over how chunks are processed.
1772///
1773/// # Arguments
1774///
1775/// * `view_definition` - The ViewDefinition to execute
1776/// * `reader` - A buffered reader for the NDJSON input
1777/// * `config` - Configuration for chunk processing
1778///
1779/// # Returns
1780///
1781/// An iterator that yields `ChunkedResult` for each processed chunk.
1782pub fn iter_ndjson_chunks<R: BufRead>(
1783 view_definition: SofViewDefinition,
1784 reader: R,
1785 config: ChunkConfig,
1786) -> Result<NdjsonChunkIterator<R>, SofError> {
1787 NdjsonChunkIterator::new(view_definition, reader, config)
1788}
1789
1790// =============================================================================
1791// End Streaming/Chunked Processing Types
1792// =============================================================================
1793
1794/// Parse a JSON value into a FhirResource for the given FHIR version.
1795///
1796/// This is used internally for streaming/chunked processing where we have
1797/// raw JSON that needs to be converted to typed resources for FHIRPath evaluation.
1798fn parse_json_to_fhir_resource(
1799 json: serde_json::Value,
1800 version: FhirVersion,
1801) -> Result<helios_fhir::FhirResource, SofError> {
1802 match version {
1803 #[cfg(feature = "R4")]
1804 FhirVersion::R4 => {
1805 let resource: helios_fhir::r4::Resource =
1806 serde_json::from_value(json).map_err(|e| {
1807 SofError::InvalidSourceContent(format!("Invalid R4 resource: {}", e))
1808 })?;
1809 Ok(helios_fhir::FhirResource::R4(Box::new(resource)))
1810 }
1811 #[cfg(feature = "R4B")]
1812 FhirVersion::R4B => {
1813 let resource: helios_fhir::r4b::Resource =
1814 serde_json::from_value(json).map_err(|e| {
1815 SofError::InvalidSourceContent(format!("Invalid R4B resource: {}", e))
1816 })?;
1817 Ok(helios_fhir::FhirResource::R4B(Box::new(resource)))
1818 }
1819 #[cfg(feature = "R5")]
1820 FhirVersion::R5 => {
1821 let resource: helios_fhir::r5::Resource =
1822 serde_json::from_value(json).map_err(|e| {
1823 SofError::InvalidSourceContent(format!("Invalid R5 resource: {}", e))
1824 })?;
1825 Ok(helios_fhir::FhirResource::R5(Box::new(resource)))
1826 }
1827 #[cfg(feature = "R6")]
1828 FhirVersion::R6 => {
1829 let resource: helios_fhir::r6::Resource =
1830 serde_json::from_value(json).map_err(|e| {
1831 SofError::InvalidSourceContent(format!("Invalid R6 resource: {}", e))
1832 })?;
1833 Ok(helios_fhir::FhirResource::R6(Box::new(resource)))
1834 }
1835 }
1836}
1837
1838/// Execute a ViewDefinition transformation with additional filtering options.
1839///
1840/// This function extends the basic `run_view_definition` with support for:
1841/// - Filtering resources by modification time (`since`)
1842/// - Limiting results (`limit`)
1843/// - Pagination (`page`)
1844///
1845/// # Arguments
1846///
1847/// * `view_definition` - The ViewDefinition to execute
1848/// * `bundle` - The Bundle containing resources to transform
1849/// * `content_type` - Desired output format
1850/// * `options` - Additional filtering and control options
1851///
1852/// # Returns
1853///
1854/// The transformed data in the requested format, with filtering applied.
1855pub fn run_view_definition_with_options(
1856 view_definition: SofViewDefinition,
1857 bundle: SofBundle,
1858 content_type: ContentType,
1859 options: RunOptions,
1860) -> Result<Vec<u8>, SofError> {
1861 // Filter bundle resources by since parameter before processing
1862 let filtered_bundle = if let Some(since) = options.since {
1863 filter_bundle_by_since(bundle, since)?
1864 } else {
1865 bundle
1866 };
1867
1868 // Process the ViewDefinition to generate tabular data
1869 let processed_result = process_view_definition(view_definition, filtered_bundle)?;
1870
1871 // Apply pagination if needed
1872 let processed_result = if options.limit.is_some() || options.page.is_some() {
1873 apply_pagination_to_result(processed_result, options.limit, options.page)?
1874 } else {
1875 processed_result
1876 };
1877
1878 // Format the result according to the requested content type
1879 format_output(
1880 processed_result,
1881 content_type,
1882 options.parquet_options.as_ref(),
1883 )
1884}
1885
1886pub fn process_view_definition(
1887 view_definition: SofViewDefinition,
1888 bundle: SofBundle,
1889) -> Result<ProcessedResult, SofError> {
1890 // Ensure both resources use the same FHIR version
1891 if view_definition.version() != bundle.version() {
1892 return Err(SofError::InvalidViewDefinition(
1893 "ViewDefinition and Bundle must use the same FHIR version".to_string(),
1894 ));
1895 }
1896
1897 match (view_definition, bundle) {
1898 #[cfg(feature = "R4")]
1899 (SofViewDefinition::R4(vd), SofBundle::R4(bundle)) => {
1900 process_view_definition_generic(vd, bundle)
1901 }
1902 #[cfg(feature = "R4B")]
1903 (SofViewDefinition::R4B(vd), SofBundle::R4B(bundle)) => {
1904 process_view_definition_generic(vd, bundle)
1905 }
1906 #[cfg(feature = "R5")]
1907 (SofViewDefinition::R5(vd), SofBundle::R5(bundle)) => {
1908 process_view_definition_generic(vd, bundle)
1909 }
1910 #[cfg(feature = "R6")]
1911 (SofViewDefinition::R6(vd), SofBundle::R6(bundle)) => {
1912 process_view_definition_generic(vd, bundle)
1913 }
1914 // This case should never happen due to the version check above,
1915 // but is needed for exhaustive pattern matching when multiple features are enabled
1916 #[cfg(any(
1917 all(feature = "R4", any(feature = "R4B", feature = "R5", feature = "R6")),
1918 all(feature = "R4B", any(feature = "R5", feature = "R6")),
1919 all(feature = "R5", feature = "R6")
1920 ))]
1921 _ => {
1922 unreachable!("Version mismatch should have been caught by the version check above")
1923 }
1924 }
1925}
1926
1927// Generic version-agnostic constant extraction
1928fn extract_view_definition_constants<VD: ViewDefinitionTrait>(
1929 view_definition: &VD,
1930) -> Result<HashMap<String, EvaluationResult>, SofError> {
1931 let mut variables = HashMap::new();
1932
1933 if let Some(constants) = view_definition.constants() {
1934 for constant in constants {
1935 let name = constant
1936 .name()
1937 .ok_or_else(|| {
1938 SofError::InvalidViewDefinition("Constant name is required".to_string())
1939 })?
1940 .to_string();
1941
1942 let eval_result = constant.to_evaluation_result()?;
1943 // Constants are referenced with % prefix in FHIRPath expressions
1944 variables.insert(format!("%{}", name), eval_result);
1945 }
1946 }
1947
1948 Ok(variables)
1949}
1950
1951// Generic version-agnostic ViewDefinition processing
1952fn process_view_definition_generic<VD, B>(
1953 view_definition: VD,
1954 bundle: B,
1955) -> Result<ProcessedResult, SofError>
1956where
1957 VD: ViewDefinitionTrait,
1958 B: BundleTrait,
1959 B::Resource: ResourceTrait + Sync,
1960 VD::Select: Sync,
1961{
1962 validate_view_definition(&view_definition)?;
1963
1964 // Step 1: Extract constants/variables from ViewDefinition
1965 let variables = extract_view_definition_constants(&view_definition)?;
1966
1967 // Step 2: Filter resources by type and profile
1968 let target_resource_type = view_definition
1969 .resource()
1970 .ok_or_else(|| SofError::InvalidViewDefinition("Resource type is required".to_string()))?;
1971
1972 let filtered_resources = filter_resources(&bundle, target_resource_type)?;
1973
1974 // Step 3: Apply where clauses to filter resources
1975 let filtered_resources = apply_where_clauses(
1976 filtered_resources,
1977 view_definition.where_clauses(),
1978 &variables,
1979 )?;
1980
1981 // Step 4: Process all select clauses to generate rows with forEach support
1982 let select_clauses = view_definition.select().ok_or_else(|| {
1983 SofError::InvalidViewDefinition("At least one select clause is required".to_string())
1984 })?;
1985
1986 // Generate rows for each resource using the forEach-aware approach
1987 let (all_columns, rows) =
1988 generate_rows_from_selects(&filtered_resources, select_clauses, &variables)?;
1989
1990 Ok(ProcessedResult {
1991 columns: all_columns,
1992 rows,
1993 })
1994}
1995
1996// Generic version-agnostic validation
1997fn validate_view_definition<VD: ViewDefinitionTrait>(view_def: &VD) -> Result<(), SofError> {
1998 // Basic validation
1999 if view_def.resource().is_none_or(|s| s.is_empty()) {
2000 return Err(SofError::InvalidViewDefinition(
2001 "ViewDefinition must specify a resource type".to_string(),
2002 ));
2003 }
2004
2005 if view_def.select().is_none_or(|s| s.is_empty()) {
2006 return Err(SofError::InvalidViewDefinition(
2007 "ViewDefinition must have at least one select".to_string(),
2008 ));
2009 }
2010
2011 // Validate where clauses
2012 if let Some(where_clauses) = view_def.where_clauses() {
2013 validate_where_clauses(where_clauses)?;
2014 }
2015
2016 // Validate selects
2017 if let Some(selects) = view_def.select() {
2018 for select in selects {
2019 validate_select(select)?;
2020 }
2021 }
2022
2023 Ok(())
2024}
2025
2026// Generic where clause validation
2027fn validate_where_clauses<W: ViewDefinitionWhereTrait>(
2028 where_clauses: &[W],
2029) -> Result<(), SofError> {
2030 // Basic validation - just ensure paths are provided
2031 // Type checking will be done during actual evaluation
2032 for where_clause in where_clauses {
2033 if where_clause.path().is_none() {
2034 return Err(SofError::InvalidViewDefinition(
2035 "Where clause must have a path specified".to_string(),
2036 ));
2037 }
2038 }
2039 Ok(())
2040}
2041
2042// Generic helper - no longer needs to be version-specific
2043fn can_be_coerced_to_boolean(result: &EvaluationResult) -> bool {
2044 // Check if the result can be meaningfully used as a boolean in a where clause
2045 match result {
2046 // Boolean values are obviously OK
2047 EvaluationResult::Boolean(_, _) => true,
2048
2049 // Empty is OK (evaluates to false)
2050 EvaluationResult::Empty => true,
2051
2052 // Collections are OK - they evaluate based on whether they're empty or not
2053 EvaluationResult::Collection { .. } => true,
2054
2055 // Other types cannot be meaningfully coerced to boolean for where clauses
2056 // This includes: String, Integer, Decimal, Date, DateTime, Time, Quantity, Object
2057 _ => false,
2058 }
2059}
2060
2061// Generic select validation
2062fn validate_select<S: ViewDefinitionSelectTrait>(select: &S) -> Result<(), SofError> {
2063 validate_select_with_context(select, false)
2064}
2065
2066fn validate_select_with_context<S: ViewDefinitionSelectTrait>(
2067 select: &S,
2068 in_foreach_context: bool,
2069) -> Result<(), SofError>
2070where
2071 S::Select: ViewDefinitionSelectTrait,
2072{
2073 // Determine if we're entering a forEach context at this level
2074 let entering_foreach = select.for_each().is_some() || select.for_each_or_null().is_some();
2075 let current_foreach_context = in_foreach_context || entering_foreach;
2076
2077 // Validate collection attribute with the current forEach context
2078 if let Some(columns) = select.column() {
2079 for column in columns {
2080 if let Some(collection_value) = column.collection() {
2081 if !collection_value && !current_foreach_context {
2082 return Err(SofError::InvalidViewDefinition(
2083 "Column 'collection' attribute must be true when specified".to_string(),
2084 ));
2085 }
2086 }
2087 }
2088 }
2089
2090 // Validate unionAll column consistency
2091 if let Some(union_selects) = select.union_all() {
2092 validate_union_all_columns(union_selects)?;
2093 }
2094
2095 // Recursively validate nested selects
2096 if let Some(nested_selects) = select.select() {
2097 for nested_select in nested_selects {
2098 validate_select_with_context(nested_select, current_foreach_context)?;
2099 }
2100 }
2101
2102 // Validate unionAll selects with forEach context
2103 if let Some(union_selects) = select.union_all() {
2104 for union_select in union_selects {
2105 validate_select_with_context(union_select, current_foreach_context)?;
2106 }
2107 }
2108
2109 Ok(())
2110}
2111
2112// Generic union validation
2113fn validate_union_all_columns<S: ViewDefinitionSelectTrait>(
2114 union_selects: &[S],
2115) -> Result<(), SofError> {
2116 if union_selects.len() < 2 {
2117 return Ok(());
2118 }
2119
2120 // Get column names and order from first select
2121 let first_select = &union_selects[0];
2122 let first_columns = get_column_names(first_select)?;
2123
2124 // Validate all other selects have the same column names in the same order
2125 for (index, union_select) in union_selects.iter().enumerate().skip(1) {
2126 let current_columns = get_column_names(union_select)?;
2127
2128 if current_columns != first_columns {
2129 if current_columns.len() != first_columns.len()
2130 || !current_columns
2131 .iter()
2132 .all(|name| first_columns.contains(name))
2133 {
2134 return Err(SofError::InvalidViewDefinition(format!(
2135 "UnionAll branch {} has different column names than first branch",
2136 index
2137 )));
2138 } else {
2139 return Err(SofError::InvalidViewDefinition(format!(
2140 "UnionAll branch {} has columns in different order than first branch",
2141 index
2142 )));
2143 }
2144 }
2145 }
2146
2147 Ok(())
2148}
2149
2150// Generic column name extraction
2151fn get_column_names<S: ViewDefinitionSelectTrait>(select: &S) -> Result<Vec<String>, SofError> {
2152 let mut column_names = Vec::new();
2153
2154 // Collect direct column names
2155 if let Some(columns) = select.column() {
2156 for column in columns {
2157 if let Some(name) = column.name() {
2158 column_names.push(name.to_string());
2159 }
2160 }
2161 }
2162
2163 // If this select has unionAll but no direct columns, get columns from first unionAll branch
2164 if column_names.is_empty() {
2165 if let Some(union_selects) = select.union_all() {
2166 if !union_selects.is_empty() {
2167 return get_column_names(&union_selects[0]);
2168 }
2169 }
2170 }
2171
2172 Ok(column_names)
2173}
2174
2175// Generic resource filtering
2176fn filter_resources<'a, B: BundleTrait>(
2177 bundle: &'a B,
2178 resource_type: &str,
2179) -> Result<Vec<&'a B::Resource>, SofError> {
2180 Ok(bundle
2181 .entries()
2182 .into_iter()
2183 .filter(|resource| resource.resource_name() == resource_type)
2184 .collect())
2185}
2186
2187// Generic where clause application
2188fn apply_where_clauses<'a, R, W>(
2189 resources: Vec<&'a R>,
2190 where_clauses: Option<&[W]>,
2191 variables: &HashMap<String, EvaluationResult>,
2192) -> Result<Vec<&'a R>, SofError>
2193where
2194 R: ResourceTrait,
2195 W: ViewDefinitionWhereTrait,
2196{
2197 if let Some(wheres) = where_clauses {
2198 let mut filtered = Vec::new();
2199
2200 for resource in resources {
2201 let mut include_resource = true;
2202
2203 // All where clauses must evaluate to true for the resource to be included
2204 for where_clause in wheres {
2205 let fhir_resource = resource.to_fhir_resource();
2206 let mut context = EvaluationContext::new(vec![fhir_resource]);
2207
2208 // Add variables to the context
2209 for (name, value) in variables {
2210 context.set_variable_result(name, value.clone());
2211 }
2212
2213 let path = where_clause.path().ok_or_else(|| {
2214 SofError::InvalidViewDefinition("Where clause path is required".to_string())
2215 })?;
2216
2217 match evaluate_expression(path, &context) {
2218 Ok(result) => {
2219 // Check if the result can be meaningfully used as a boolean
2220 if !can_be_coerced_to_boolean(&result) {
2221 return Err(SofError::InvalidViewDefinition(format!(
2222 "Where clause path '{}' returns type '{}' which cannot be used as a boolean condition. \
2223 Where clauses must return boolean values, collections, or empty results.",
2224 path,
2225 result.type_name()
2226 )));
2227 }
2228
2229 // Check if result is truthy (non-empty and not false)
2230 if !is_truthy(&result) {
2231 include_resource = false;
2232 break;
2233 }
2234 }
2235 Err(e) => {
2236 return Err(SofError::FhirPathError(format!(
2237 "Error evaluating where clause '{}': {}",
2238 path, e
2239 )));
2240 }
2241 }
2242 }
2243
2244 if include_resource {
2245 filtered.push(resource);
2246 }
2247 }
2248
2249 Ok(filtered)
2250 } else {
2251 Ok(resources)
2252 }
2253}
2254
2255// Removed generate_rows_per_resource_r4 - replaced with new forEach-aware implementation
2256
2257// Removed generate_rows_with_for_each_r4 - replaced with new forEach-aware implementation
2258
2259// Helper functions for FHIRPath result processing
2260fn is_truthy(result: &EvaluationResult) -> bool {
2261 match result {
2262 EvaluationResult::Empty => false,
2263 EvaluationResult::Boolean(b, _) => *b,
2264 EvaluationResult::Collection { items, .. } => !items.is_empty(),
2265 _ => true, // Non-empty, non-false values are truthy
2266 }
2267}
2268
2269fn fhirpath_result_to_json_value_collection(result: EvaluationResult) -> Option<serde_json::Value> {
2270 match result {
2271 EvaluationResult::Empty => Some(serde_json::Value::Array(vec![])),
2272 EvaluationResult::Collection { items, .. } => {
2273 // Always return array for collection columns, even if empty
2274 let values: Vec<serde_json::Value> = items
2275 .into_iter()
2276 .filter_map(fhirpath_result_to_json_value)
2277 .collect();
2278 Some(serde_json::Value::Array(values))
2279 }
2280 // For non-collection results in collection columns, wrap in array
2281 single_result => {
2282 if let Some(json_val) = fhirpath_result_to_json_value(single_result) {
2283 Some(serde_json::Value::Array(vec![json_val]))
2284 } else {
2285 Some(serde_json::Value::Array(vec![]))
2286 }
2287 }
2288 }
2289}
2290
2291fn fhirpath_result_to_json_value(result: EvaluationResult) -> Option<serde_json::Value> {
2292 match result {
2293 EvaluationResult::Empty => None,
2294 EvaluationResult::Boolean(b, _) => Some(serde_json::Value::Bool(b)),
2295 EvaluationResult::Integer(i, _) => {
2296 Some(serde_json::Value::Number(serde_json::Number::from(i)))
2297 }
2298 EvaluationResult::Decimal(d, _) => {
2299 // Check if this Decimal represents a whole number
2300 if d.fract().is_zero() {
2301 // Convert to integer if no fractional part
2302 if let Ok(i) = d.to_string().parse::<i64>() {
2303 Some(serde_json::Value::Number(serde_json::Number::from(i)))
2304 } else {
2305 // Handle very large numbers as strings
2306 Some(serde_json::Value::String(d.to_string()))
2307 }
2308 } else {
2309 // Convert Decimal to a float for fractional numbers
2310 if let Ok(f) = d.to_string().parse::<f64>() {
2311 if let Some(num) = serde_json::Number::from_f64(f) {
2312 Some(serde_json::Value::Number(num))
2313 } else {
2314 Some(serde_json::Value::String(d.to_string()))
2315 }
2316 } else {
2317 Some(serde_json::Value::String(d.to_string()))
2318 }
2319 }
2320 }
2321 EvaluationResult::String(s, _) => Some(serde_json::Value::String(s)),
2322 EvaluationResult::Date(s, _) => Some(serde_json::Value::String(s)),
2323 EvaluationResult::DateTime(s, _) => {
2324 // Remove "@" prefix from datetime strings if present
2325 let cleaned = s.strip_prefix("@").unwrap_or(&s);
2326 Some(serde_json::Value::String(cleaned.to_string()))
2327 }
2328 EvaluationResult::Time(s, _) => {
2329 // Remove "@T" prefix from time strings if present
2330 let cleaned = s.strip_prefix("@T").unwrap_or(&s);
2331 Some(serde_json::Value::String(cleaned.to_string()))
2332 }
2333 EvaluationResult::Collection { items, .. } => {
2334 if items.len() == 1 {
2335 // Single item collection - unwrap to the item itself
2336 fhirpath_result_to_json_value(items.into_iter().next().unwrap())
2337 } else if items.is_empty() {
2338 None
2339 } else {
2340 // Multiple items - convert to array
2341 let values: Vec<serde_json::Value> = items
2342 .into_iter()
2343 .filter_map(fhirpath_result_to_json_value)
2344 .collect();
2345 Some(serde_json::Value::Array(values))
2346 }
2347 }
2348 EvaluationResult::Object { map, .. } => {
2349 let mut json_map = serde_json::Map::new();
2350 for (k, v) in map {
2351 if let Some(json_val) = fhirpath_result_to_json_value(v) {
2352 json_map.insert(k, json_val);
2353 }
2354 }
2355 Some(serde_json::Value::Object(json_map))
2356 }
2357 // Handle other result types as strings
2358 _ => Some(serde_json::Value::String(format!("{:?}", result))),
2359 }
2360}
2361
2362fn extract_iteration_items(result: EvaluationResult) -> Vec<EvaluationResult> {
2363 match result {
2364 EvaluationResult::Collection { items, .. } => items,
2365 EvaluationResult::Empty => Vec::new(),
2366 single_item => vec![single_item],
2367 }
2368}
2369
2370// Generic row generation functions
2371
2372fn generate_rows_from_selects<R, S>(
2373 resources: &[&R],
2374 selects: &[S],
2375 variables: &HashMap<String, EvaluationResult>,
2376) -> Result<(Vec<String>, Vec<ProcessedRow>), SofError>
2377where
2378 R: ResourceTrait + Sync,
2379 S: ViewDefinitionSelectTrait + Sync,
2380 S::Select: ViewDefinitionSelectTrait,
2381{
2382 // Process resources in parallel
2383 let resource_results: Result<Vec<_>, _> = resources
2384 .par_iter()
2385 .map(|resource| {
2386 // Each thread gets its own local column vector
2387 let mut local_columns = Vec::new();
2388 let resource_rows =
2389 generate_rows_for_resource(*resource, selects, &mut local_columns, variables)?;
2390 Ok::<(Vec<String>, Vec<ProcessedRow>), SofError>((local_columns, resource_rows))
2391 })
2392 .collect();
2393
2394 // Handle errors from parallel processing
2395 let resource_results = resource_results?;
2396
2397 // Merge columns from all threads (maintaining order is important)
2398 let mut final_columns = Vec::new();
2399 let mut all_rows = Vec::new();
2400
2401 for (local_columns, resource_rows) in resource_results {
2402 // Merge columns, avoiding duplicates
2403 for col in local_columns {
2404 if !final_columns.contains(&col) {
2405 final_columns.push(col);
2406 }
2407 }
2408 all_rows.extend(resource_rows);
2409 }
2410
2411 Ok((final_columns, all_rows))
2412}
2413
2414fn generate_rows_for_resource<R, S>(
2415 resource: &R,
2416 selects: &[S],
2417 all_columns: &mut Vec<String>,
2418 variables: &HashMap<String, EvaluationResult>,
2419) -> Result<Vec<ProcessedRow>, SofError>
2420where
2421 R: ResourceTrait,
2422 S: ViewDefinitionSelectTrait,
2423 S::Select: ViewDefinitionSelectTrait,
2424{
2425 let fhir_resource = resource.to_fhir_resource();
2426 let mut context = EvaluationContext::new(vec![fhir_resource]);
2427
2428 // Add variables to the context
2429 for (name, value) in variables {
2430 context.set_variable_result(name, value.clone());
2431 }
2432
2433 // Generate all possible row combinations for this resource
2434 let row_combinations = generate_row_combinations(&context, selects, all_columns, variables)?;
2435
2436 Ok(row_combinations)
2437}
2438
2439#[derive(Debug, Clone)]
2440struct RowCombination {
2441 values: Vec<Option<serde_json::Value>>,
2442}
2443
2444fn generate_row_combinations<S>(
2445 context: &EvaluationContext,
2446 selects: &[S],
2447 all_columns: &mut Vec<String>,
2448 variables: &HashMap<String, EvaluationResult>,
2449) -> Result<Vec<ProcessedRow>, SofError>
2450where
2451 S: ViewDefinitionSelectTrait,
2452 S::Select: ViewDefinitionSelectTrait,
2453{
2454 // First pass: collect all column names to ensure consistent ordering
2455 collect_all_columns(selects, all_columns)?;
2456
2457 // Second pass: generate all row combinations
2458 let mut row_combinations = vec![RowCombination {
2459 values: vec![None; all_columns.len()],
2460 }];
2461
2462 for select in selects {
2463 row_combinations =
2464 expand_select_combinations(context, select, &row_combinations, all_columns, variables)?;
2465 }
2466
2467 // Convert to ProcessedRow format
2468 Ok(row_combinations
2469 .into_iter()
2470 .map(|combo| ProcessedRow {
2471 values: combo.values,
2472 })
2473 .collect())
2474}
2475
2476fn collect_all_columns<S>(selects: &[S], all_columns: &mut Vec<String>) -> Result<(), SofError>
2477where
2478 S: ViewDefinitionSelectTrait,
2479{
2480 for select in selects {
2481 // Add columns from this select
2482 if let Some(columns) = select.column() {
2483 for col in columns {
2484 if let Some(name) = col.name() {
2485 if !all_columns.contains(&name.to_string()) {
2486 all_columns.push(name.to_string());
2487 }
2488 }
2489 }
2490 }
2491
2492 // Recursively collect from nested selects
2493 if let Some(nested_selects) = select.select() {
2494 collect_all_columns(nested_selects, all_columns)?;
2495 }
2496
2497 // Collect from unionAll
2498 if let Some(union_selects) = select.union_all() {
2499 collect_all_columns(union_selects, all_columns)?;
2500 }
2501 }
2502 Ok(())
2503}
2504
2505fn expand_select_combinations<S>(
2506 context: &EvaluationContext,
2507 select: &S,
2508 existing_combinations: &[RowCombination],
2509 all_columns: &[String],
2510 variables: &HashMap<String, EvaluationResult>,
2511) -> Result<Vec<RowCombination>, SofError>
2512where
2513 S: ViewDefinitionSelectTrait,
2514 S::Select: ViewDefinitionSelectTrait,
2515{
2516 // Handle forEach and forEachOrNull
2517 if let Some(for_each_path) = select.for_each() {
2518 return expand_for_each_combinations(
2519 context,
2520 select,
2521 existing_combinations,
2522 all_columns,
2523 for_each_path,
2524 false,
2525 variables,
2526 );
2527 }
2528
2529 if let Some(for_each_or_null_path) = select.for_each_or_null() {
2530 return expand_for_each_combinations(
2531 context,
2532 select,
2533 existing_combinations,
2534 all_columns,
2535 for_each_or_null_path,
2536 true,
2537 variables,
2538 );
2539 }
2540
2541 // Handle repeat directive for recursive traversal
2542 if let Some(repeat_paths) = select.repeat() {
2543 return expand_repeat_combinations(
2544 context,
2545 select,
2546 existing_combinations,
2547 all_columns,
2548 &repeat_paths,
2549 variables,
2550 );
2551 }
2552
2553 // Handle regular columns (no forEach)
2554 let mut new_combinations = Vec::new();
2555
2556 for existing_combo in existing_combinations {
2557 let mut new_combo = existing_combo.clone();
2558
2559 // Add values from this select's columns
2560 if let Some(columns) = select.column() {
2561 for col in columns {
2562 if let Some(col_name) = col.name() {
2563 if let Some(col_index) = all_columns.iter().position(|name| name == col_name) {
2564 let path = col.path().ok_or_else(|| {
2565 SofError::InvalidViewDefinition("Column path is required".to_string())
2566 })?;
2567
2568 match evaluate_expression(path, context) {
2569 Ok(result) => {
2570 // Check if this column is marked as a collection
2571 let is_collection = col.collection().unwrap_or(false);
2572
2573 new_combo.values[col_index] = if is_collection {
2574 fhirpath_result_to_json_value_collection(result)
2575 } else {
2576 fhirpath_result_to_json_value(result)
2577 };
2578 }
2579 Err(e) => {
2580 return Err(SofError::FhirPathError(format!(
2581 "Error evaluating column '{}' with path '{}': {}",
2582 col_name, path, e
2583 )));
2584 }
2585 }
2586 }
2587 }
2588 }
2589 }
2590
2591 new_combinations.push(new_combo);
2592 }
2593
2594 // Handle nested selects
2595 if let Some(nested_selects) = select.select() {
2596 for nested_select in nested_selects {
2597 new_combinations = expand_select_combinations(
2598 context,
2599 nested_select,
2600 &new_combinations,
2601 all_columns,
2602 variables,
2603 )?;
2604 }
2605 }
2606
2607 // Handle unionAll
2608 if let Some(union_selects) = select.union_all() {
2609 let mut union_combinations = Vec::new();
2610
2611 // Process each unionAll select independently, using the combinations that already have
2612 // values from this select's columns and nested selects
2613 for union_select in union_selects {
2614 let select_combinations = expand_select_combinations(
2615 context,
2616 union_select,
2617 &new_combinations,
2618 all_columns,
2619 variables,
2620 )?;
2621 union_combinations.extend(select_combinations);
2622 }
2623
2624 // unionAll replaces new_combinations with the union results
2625 // If no union results, this resource should be filtered out (no rows for this resource)
2626 new_combinations = union_combinations;
2627 }
2628
2629 Ok(new_combinations)
2630}
2631
2632fn expand_for_each_combinations<S>(
2633 context: &EvaluationContext,
2634 select: &S,
2635 existing_combinations: &[RowCombination],
2636 all_columns: &[String],
2637 for_each_path: &str,
2638 allow_null: bool,
2639 variables: &HashMap<String, EvaluationResult>,
2640) -> Result<Vec<RowCombination>, SofError>
2641where
2642 S: ViewDefinitionSelectTrait,
2643 S::Select: ViewDefinitionSelectTrait,
2644{
2645 // Evaluate the forEach expression to get iteration items
2646 let for_each_result = evaluate_expression(for_each_path, context).map_err(|e| {
2647 SofError::FhirPathError(format!(
2648 "Error evaluating forEach expression '{}': {}",
2649 for_each_path, e
2650 ))
2651 })?;
2652
2653 let iteration_items = extract_iteration_items(for_each_result);
2654
2655 if iteration_items.is_empty() {
2656 if allow_null {
2657 // forEachOrNull: generate null rows
2658 let mut new_combinations = Vec::new();
2659 for existing_combo in existing_combinations {
2660 let mut new_combo = existing_combo.clone();
2661
2662 // Set column values to null for this forEach scope
2663 if let Some(columns) = select.column() {
2664 for col in columns {
2665 if let Some(col_name) = col.name() {
2666 if let Some(col_index) =
2667 all_columns.iter().position(|name| name == col_name)
2668 {
2669 new_combo.values[col_index] = None;
2670 }
2671 }
2672 }
2673 }
2674
2675 new_combinations.push(new_combo);
2676 }
2677 return Ok(new_combinations);
2678 } else {
2679 // forEach with empty collection: no rows
2680 return Ok(Vec::new());
2681 }
2682 }
2683
2684 let mut new_combinations = Vec::new();
2685
2686 // For each iteration item, create new combinations
2687 for item in &iteration_items {
2688 // Create a new context with the iteration item
2689 let _item_context = create_iteration_context(item, variables);
2690
2691 for existing_combo in existing_combinations {
2692 let mut new_combo = existing_combo.clone();
2693
2694 // Evaluate columns in the context of the iteration item
2695 if let Some(columns) = select.column() {
2696 for col in columns {
2697 if let Some(col_name) = col.name() {
2698 if let Some(col_index) =
2699 all_columns.iter().position(|name| name == col_name)
2700 {
2701 let path = col.path().ok_or_else(|| {
2702 SofError::InvalidViewDefinition(
2703 "Column path is required".to_string(),
2704 )
2705 })?;
2706
2707 // Use the iteration item directly for path evaluation
2708 let result = if path == "$this" {
2709 // Special case: $this refers to the current iteration item
2710 item.clone()
2711 } else {
2712 // Evaluate the path on the iteration item
2713 evaluate_path_on_item(path, item, variables)?
2714 };
2715
2716 // Check if this column is marked as a collection
2717 let is_collection = col.collection().unwrap_or(false);
2718
2719 new_combo.values[col_index] = if is_collection {
2720 fhirpath_result_to_json_value_collection(result)
2721 } else {
2722 fhirpath_result_to_json_value(result)
2723 };
2724 }
2725 }
2726 }
2727 }
2728
2729 new_combinations.push(new_combo);
2730 }
2731 }
2732
2733 // Handle nested selects with the forEach context
2734 if let Some(nested_selects) = select.select() {
2735 let mut final_combinations = Vec::new();
2736
2737 for item in &iteration_items {
2738 let item_context = create_iteration_context(item, variables);
2739
2740 // For each iteration item, we need to start with the combinations that have
2741 // the correct column values for this forEach scope
2742 for existing_combo in existing_combinations {
2743 // Find the combination that corresponds to this iteration item
2744 // by looking at the values we set for columns in this forEach scope
2745 let mut base_combo = existing_combo.clone();
2746
2747 // Update the base combination with column values for this iteration item
2748 if let Some(columns) = select.column() {
2749 for col in columns {
2750 if let Some(col_name) = col.name() {
2751 if let Some(col_index) =
2752 all_columns.iter().position(|name| name == col_name)
2753 {
2754 let path = col.path().ok_or_else(|| {
2755 SofError::InvalidViewDefinition(
2756 "Column path is required".to_string(),
2757 )
2758 })?;
2759
2760 let result = if path == "$this" {
2761 item.clone()
2762 } else {
2763 evaluate_path_on_item(path, item, variables)?
2764 };
2765
2766 // Check if this column is marked as a collection
2767 let is_collection = col.collection().unwrap_or(false);
2768
2769 base_combo.values[col_index] = if is_collection {
2770 fhirpath_result_to_json_value_collection(result)
2771 } else {
2772 fhirpath_result_to_json_value(result)
2773 };
2774 }
2775 }
2776 }
2777 }
2778
2779 // Start with this base combination for nested processing
2780 let mut item_combinations = vec![base_combo];
2781
2782 // Process nested selects
2783 for nested_select in nested_selects {
2784 item_combinations = expand_select_combinations(
2785 &item_context,
2786 nested_select,
2787 &item_combinations,
2788 all_columns,
2789 variables,
2790 )?;
2791 }
2792
2793 final_combinations.extend(item_combinations);
2794 }
2795 }
2796
2797 new_combinations = final_combinations;
2798 }
2799
2800 // Handle unionAll within forEach context
2801 if let Some(union_selects) = select.union_all() {
2802 let mut union_combinations = Vec::new();
2803
2804 for item in &iteration_items {
2805 let item_context = create_iteration_context(item, variables);
2806
2807 // For each iteration item, process all unionAll selects
2808 for existing_combo in existing_combinations {
2809 let mut base_combo = existing_combo.clone();
2810
2811 // Update the base combination with column values for this iteration item
2812 if let Some(columns) = select.column() {
2813 for col in columns {
2814 if let Some(col_name) = col.name() {
2815 if let Some(col_index) =
2816 all_columns.iter().position(|name| name == col_name)
2817 {
2818 let path = col.path().ok_or_else(|| {
2819 SofError::InvalidViewDefinition(
2820 "Column path is required".to_string(),
2821 )
2822 })?;
2823
2824 let result = if path == "$this" {
2825 item.clone()
2826 } else {
2827 evaluate_path_on_item(path, item, variables)?
2828 };
2829
2830 // Check if this column is marked as a collection
2831 let is_collection = col.collection().unwrap_or(false);
2832
2833 base_combo.values[col_index] = if is_collection {
2834 fhirpath_result_to_json_value_collection(result)
2835 } else {
2836 fhirpath_result_to_json_value(result)
2837 };
2838 }
2839 }
2840 }
2841 }
2842
2843 // Also evaluate columns from nested selects and add them to base_combo
2844 if let Some(nested_selects) = select.select() {
2845 for nested_select in nested_selects {
2846 if let Some(nested_columns) = nested_select.column() {
2847 for col in nested_columns {
2848 if let Some(col_name) = col.name() {
2849 if let Some(col_index) =
2850 all_columns.iter().position(|name| name == col_name)
2851 {
2852 let path = col.path().ok_or_else(|| {
2853 SofError::InvalidViewDefinition(
2854 "Column path is required".to_string(),
2855 )
2856 })?;
2857
2858 let result = if path == "$this" {
2859 item.clone()
2860 } else {
2861 evaluate_path_on_item(path, item, variables)?
2862 };
2863
2864 // Check if this column is marked as a collection
2865 let is_collection = col.collection().unwrap_or(false);
2866
2867 base_combo.values[col_index] = if is_collection {
2868 fhirpath_result_to_json_value_collection(result)
2869 } else {
2870 fhirpath_result_to_json_value(result)
2871 };
2872 }
2873 }
2874 }
2875 }
2876 }
2877 }
2878
2879 // Process each unionAll select independently for this iteration item
2880 for union_select in union_selects {
2881 let mut select_combinations = vec![base_combo.clone()];
2882 select_combinations = expand_select_combinations(
2883 &item_context,
2884 union_select,
2885 &select_combinations,
2886 all_columns,
2887 variables,
2888 )?;
2889 union_combinations.extend(select_combinations);
2890 }
2891 }
2892 }
2893
2894 // unionAll replaces new_combinations with the union results
2895 // If no union results, filter out this resource (no rows for this resource)
2896 new_combinations = union_combinations;
2897 }
2898
2899 Ok(new_combinations)
2900}
2901
2902fn expand_repeat_combinations<S>(
2903 context: &EvaluationContext,
2904 select: &S,
2905 existing_combinations: &[RowCombination],
2906 all_columns: &[String],
2907 repeat_paths: &[&str],
2908 variables: &HashMap<String, EvaluationResult>,
2909) -> Result<Vec<RowCombination>, SofError>
2910where
2911 S: ViewDefinitionSelectTrait,
2912 S::Select: ViewDefinitionSelectTrait,
2913{
2914 // The repeat directive performs recursive traversal:
2915 // 1. For each repeat path, find child elements from the current context
2916 // 2. For each child element:
2917 // a. Evaluate columns in the child's context
2918 // b. Recursively process the child with the same repeat paths
2919 // 3. Union all results together
2920 //
2921 // Note: Unlike forEach, repeat does NOT process the current level's columns
2922 // - it ONLY processes elements found via the repeat paths
2923
2924 let mut all_combinations = Vec::new();
2925
2926 // Process each existing combination
2927 for existing_combo in existing_combinations {
2928 // Process each repeat path to find children to traverse
2929 for repeat_path in repeat_paths {
2930 // Evaluate the repeat path to get child elements
2931 let repeat_result = evaluate_expression(repeat_path, context).map_err(|e| {
2932 SofError::FhirPathError(format!(
2933 "Error evaluating repeat expression '{}': {}",
2934 repeat_path, e
2935 ))
2936 })?;
2937
2938 let child_items = extract_iteration_items(repeat_result);
2939
2940 // For each child item found via this repeat path
2941 for child_item in &child_items {
2942 // Create a combination for this child with current level's columns
2943 let mut child_combo = existing_combo.clone();
2944
2945 // Evaluate columns in the context of this child item
2946 if let Some(columns) = select.column() {
2947 for col in columns {
2948 if let Some(col_name) = col.name() {
2949 if let Some(col_index) =
2950 all_columns.iter().position(|name| name == col_name)
2951 {
2952 let path = col.path().ok_or_else(|| {
2953 SofError::InvalidViewDefinition(
2954 "Column path is required".to_string(),
2955 )
2956 })?;
2957
2958 // Evaluate the path on the child item
2959 let result = if path == "$this" {
2960 child_item.clone()
2961 } else {
2962 evaluate_path_on_item(path, child_item, variables)?
2963 };
2964
2965 let is_collection = col.collection().unwrap_or(false);
2966 child_combo.values[col_index] = if is_collection {
2967 fhirpath_result_to_json_value_collection(result)
2968 } else {
2969 fhirpath_result_to_json_value(result)
2970 };
2971 }
2972 }
2973 }
2974 }
2975
2976 // Create context for this child item
2977 let child_context = create_iteration_context(child_item, variables);
2978
2979 // Start with the child combination we just created
2980 let mut child_combinations = vec![child_combo.clone()];
2981
2982 // Process nested selects (like forEach/forEachOrNull) in the child's context
2983 if let Some(nested_selects) = select.select() {
2984 for nested_select in nested_selects {
2985 child_combinations = expand_select_combinations(
2986 &child_context,
2987 nested_select,
2988 &child_combinations,
2989 all_columns,
2990 variables,
2991 )?;
2992 }
2993 }
2994
2995 // Add the processed combinations to our results
2996 // (these may have been filtered by forEach, which is correct)
2997 all_combinations.extend(child_combinations);
2998
2999 // Now recursively process this child with the same repeat paths
3000 // IMPORTANT: Use the original child_combo, not the forEach-filtered results
3001 let recursive_combinations = expand_repeat_combinations(
3002 &child_context,
3003 select,
3004 &[child_combo],
3005 all_columns,
3006 repeat_paths,
3007 variables,
3008 )?;
3009
3010 all_combinations.extend(recursive_combinations);
3011 }
3012 }
3013 }
3014
3015 Ok(all_combinations)
3016}
3017
3018// Generic helper functions
3019fn evaluate_path_on_item(
3020 path: &str,
3021 item: &EvaluationResult,
3022 variables: &HashMap<String, EvaluationResult>,
3023) -> Result<EvaluationResult, SofError> {
3024 // Create a temporary context with the iteration item as the root resource
3025 let mut temp_context = match item {
3026 EvaluationResult::Object { .. } => {
3027 // Convert the iteration item to a resource-like structure for FHIRPath evaluation
3028 // For simplicity, we'll create a basic context where the item is available for evaluation
3029 let mut context = EvaluationContext::new(vec![]);
3030 context.this = Some(item.clone());
3031 context
3032 }
3033 _ => EvaluationContext::new(vec![]),
3034 };
3035
3036 // Add variables to the temporary context
3037 for (name, value) in variables {
3038 temp_context.set_variable_result(name, value.clone());
3039 }
3040
3041 // Evaluate the FHIRPath expression in the context of the iteration item
3042 match evaluate_expression(path, &temp_context) {
3043 Ok(result) => Ok(result),
3044 Err(_e) => {
3045 // If FHIRPath evaluation fails, try simple property access as fallback
3046 match item {
3047 EvaluationResult::Object { map, .. } => {
3048 if let Some(value) = map.get(path) {
3049 Ok(value.clone())
3050 } else {
3051 Ok(EvaluationResult::Empty)
3052 }
3053 }
3054 _ => Ok(EvaluationResult::Empty),
3055 }
3056 }
3057 }
3058}
3059
3060fn create_iteration_context(
3061 item: &EvaluationResult,
3062 variables: &HashMap<String, EvaluationResult>,
3063) -> EvaluationContext {
3064 // Create a new context with the iteration item as the root
3065 let mut context = EvaluationContext::new(vec![]);
3066 context.this = Some(item.clone());
3067
3068 // Preserve variables from the parent context
3069 for (name, value) in variables {
3070 context.set_variable_result(name, value.clone());
3071 }
3072
3073 context
3074}
3075
3076/// Filter a bundle's resources by their lastUpdated metadata
3077fn filter_bundle_by_since(bundle: SofBundle, since: DateTime<Utc>) -> Result<SofBundle, SofError> {
3078 match bundle {
3079 #[cfg(feature = "R4")]
3080 SofBundle::R4(mut b) => {
3081 if let Some(entries) = b.entry.as_mut() {
3082 entries.retain(|entry| {
3083 entry
3084 .resource
3085 .as_ref()
3086 .and_then(|r| r.get_last_updated())
3087 .map(|last_updated| last_updated > since)
3088 .unwrap_or(false)
3089 });
3090 }
3091 Ok(SofBundle::R4(b))
3092 }
3093 #[cfg(feature = "R4B")]
3094 SofBundle::R4B(mut b) => {
3095 if let Some(entries) = b.entry.as_mut() {
3096 entries.retain(|entry| {
3097 entry
3098 .resource
3099 .as_ref()
3100 .and_then(|r| r.get_last_updated())
3101 .map(|last_updated| last_updated > since)
3102 .unwrap_or(false)
3103 });
3104 }
3105 Ok(SofBundle::R4B(b))
3106 }
3107 #[cfg(feature = "R5")]
3108 SofBundle::R5(mut b) => {
3109 if let Some(entries) = b.entry.as_mut() {
3110 entries.retain(|entry| {
3111 entry
3112 .resource
3113 .as_ref()
3114 .and_then(|r| r.get_last_updated())
3115 .map(|last_updated| last_updated > since)
3116 .unwrap_or(false)
3117 });
3118 }
3119 Ok(SofBundle::R5(b))
3120 }
3121 #[cfg(feature = "R6")]
3122 SofBundle::R6(mut b) => {
3123 if let Some(entries) = b.entry.as_mut() {
3124 entries.retain(|entry| {
3125 entry
3126 .resource
3127 .as_ref()
3128 .and_then(|r| r.get_last_updated())
3129 .map(|last_updated| last_updated > since)
3130 .unwrap_or(false)
3131 });
3132 }
3133 Ok(SofBundle::R6(b))
3134 }
3135 }
3136}
3137
3138/// Apply pagination to processed results
3139fn apply_pagination_to_result(
3140 mut result: ProcessedResult,
3141 limit: Option<usize>,
3142 page: Option<usize>,
3143) -> Result<ProcessedResult, SofError> {
3144 if let Some(limit) = limit {
3145 let page_num = page.unwrap_or(1);
3146 if page_num == 0 {
3147 return Err(SofError::InvalidViewDefinition(
3148 "Page number must be greater than 0".to_string(),
3149 ));
3150 }
3151
3152 let start_index = (page_num - 1) * limit;
3153 if start_index >= result.rows.len() {
3154 // Return empty result if page is beyond data
3155 result.rows.clear();
3156 } else {
3157 let end_index = std::cmp::min(start_index + limit, result.rows.len());
3158 result.rows = result.rows[start_index..end_index].to_vec();
3159 }
3160 }
3161
3162 Ok(result)
3163}
3164
3165fn format_output(
3166 result: ProcessedResult,
3167 content_type: ContentType,
3168 parquet_options: Option<&ParquetOptions>,
3169) -> Result<Vec<u8>, SofError> {
3170 match content_type {
3171 ContentType::Csv | ContentType::CsvWithHeader => {
3172 format_csv(result, content_type == ContentType::CsvWithHeader)
3173 }
3174 ContentType::Json => format_json(result),
3175 ContentType::NdJson => format_ndjson(result),
3176 ContentType::Parquet => format_parquet(result, parquet_options),
3177 }
3178}
3179
3180fn format_csv(result: ProcessedResult, include_header: bool) -> Result<Vec<u8>, SofError> {
3181 let mut wtr = csv::Writer::from_writer(vec![]);
3182
3183 if include_header {
3184 wtr.write_record(&result.columns)?;
3185 }
3186
3187 for row in result.rows {
3188 let record: Vec<String> = row
3189 .values
3190 .iter()
3191 .map(|v| match v {
3192 Some(val) => {
3193 // For string values, extract the raw string instead of JSON serializing
3194 if let serde_json::Value::String(s) = val {
3195 s.clone()
3196 } else {
3197 // For non-string values, use JSON serialization
3198 serde_json::to_string(val).unwrap_or_default()
3199 }
3200 }
3201 None => String::new(),
3202 })
3203 .collect();
3204 wtr.write_record(&record)?;
3205 }
3206
3207 wtr.into_inner()
3208 .map_err(|e| SofError::CsvWriterError(e.to_string()))
3209}
3210
3211fn format_json(result: ProcessedResult) -> Result<Vec<u8>, SofError> {
3212 let mut output = Vec::new();
3213
3214 for row in result.rows {
3215 let mut row_obj = serde_json::Map::new();
3216 for (i, column) in result.columns.iter().enumerate() {
3217 let value = row
3218 .values
3219 .get(i)
3220 .and_then(|v| v.as_ref())
3221 .cloned()
3222 .unwrap_or(serde_json::Value::Null);
3223 row_obj.insert(column.clone(), value);
3224 }
3225 output.push(serde_json::Value::Object(row_obj));
3226 }
3227
3228 Ok(serde_json::to_vec_pretty(&output)?)
3229}
3230
3231fn format_ndjson(result: ProcessedResult) -> Result<Vec<u8>, SofError> {
3232 let mut output = Vec::new();
3233
3234 for row in result.rows {
3235 let mut row_obj = serde_json::Map::new();
3236 for (i, column) in result.columns.iter().enumerate() {
3237 let value = row
3238 .values
3239 .get(i)
3240 .and_then(|v| v.as_ref())
3241 .cloned()
3242 .unwrap_or(serde_json::Value::Null);
3243 row_obj.insert(column.clone(), value);
3244 }
3245 let line = serde_json::to_string(&serde_json::Value::Object(row_obj))?;
3246 output.extend_from_slice(line.as_bytes());
3247 output.push(b'\n');
3248 }
3249
3250 Ok(output)
3251}
3252
3253fn format_parquet(
3254 result: ProcessedResult,
3255 options: Option<&ParquetOptions>,
3256) -> Result<Vec<u8>, SofError> {
3257 use arrow::record_batch::RecordBatch;
3258 use parquet::arrow::ArrowWriter;
3259 use parquet::basic::Compression;
3260 use parquet::file::properties::WriterProperties;
3261 use std::io::Cursor;
3262
3263 // Create Arrow schema from columns and sample data
3264 let schema = parquet_schema::create_arrow_schema(&result.columns, &result.rows)?;
3265 let schema_ref = std::sync::Arc::new(schema.clone());
3266
3267 // Get configuration from options or use defaults
3268 let parquet_opts = options.cloned().unwrap_or_default();
3269
3270 // Calculate optimal batch size based on row count and estimated row size
3271 let target_row_group_size_bytes = (parquet_opts.row_group_size_mb as usize) * 1024 * 1024;
3272 let target_page_size_bytes = (parquet_opts.page_size_kb as usize) * 1024;
3273 const TARGET_ROWS_PER_BATCH: usize = 100_000; // Default batch size
3274 const MAX_ROWS_PER_BATCH: usize = 500_000; // Maximum to prevent memory issues
3275
3276 // Estimate average row size from first 100 rows
3277 let sample_size = std::cmp::min(100, result.rows.len());
3278 let mut estimated_row_size = 100; // Default estimate in bytes
3279
3280 if sample_size > 0 {
3281 let sample_json_size: usize = result.rows[..sample_size]
3282 .iter()
3283 .map(|row| {
3284 row.values
3285 .iter()
3286 .filter_map(|v| v.as_ref())
3287 .map(|v| v.to_string().len())
3288 .sum::<usize>()
3289 })
3290 .sum();
3291 estimated_row_size = (sample_json_size / sample_size).max(50);
3292 }
3293
3294 // Calculate optimal batch size
3295 let optimal_batch_size = (target_row_group_size_bytes / estimated_row_size)
3296 .clamp(TARGET_ROWS_PER_BATCH, MAX_ROWS_PER_BATCH);
3297
3298 // Parse compression algorithm
3299 use parquet::basic::BrotliLevel;
3300 use parquet::basic::GzipLevel;
3301 use parquet::basic::ZstdLevel;
3302
3303 let compression = match parquet_opts.compression.as_str() {
3304 "none" => Compression::UNCOMPRESSED,
3305 "gzip" => Compression::GZIP(GzipLevel::default()),
3306 "lz4" => Compression::LZ4,
3307 "brotli" => Compression::BROTLI(BrotliLevel::default()),
3308 "zstd" => Compression::ZSTD(ZstdLevel::default()),
3309 _ => Compression::SNAPPY, // Default to snappy
3310 };
3311
3312 // Set up writer properties with optimized settings
3313 let props = WriterProperties::builder()
3314 .set_compression(compression)
3315 .set_max_row_group_size(target_row_group_size_bytes)
3316 .set_data_page_row_count_limit(20_000) // Optimal for predicate pushdown
3317 .set_data_page_size_limit(target_page_size_bytes)
3318 .set_write_batch_size(8192) // Control write granularity
3319 .build();
3320
3321 // Write to memory buffer
3322 let mut buffer = Vec::new();
3323 let mut cursor = Cursor::new(&mut buffer);
3324 let mut writer =
3325 ArrowWriter::try_new(&mut cursor, schema_ref.clone(), Some(props)).map_err(|e| {
3326 SofError::ParquetConversionError(format!("Failed to create Parquet writer: {}", e))
3327 })?;
3328
3329 // Process data in batches to handle large datasets efficiently
3330 let mut row_offset = 0;
3331 while row_offset < result.rows.len() {
3332 let batch_end = (row_offset + optimal_batch_size).min(result.rows.len());
3333 let batch_rows = &result.rows[row_offset..batch_end];
3334
3335 // Convert batch to Arrow arrays
3336 let batch_arrays =
3337 parquet_schema::process_to_arrow_arrays(&schema, &result.columns, batch_rows)?;
3338
3339 // Create RecordBatch for this chunk
3340 let batch = RecordBatch::try_new(schema_ref.clone(), batch_arrays).map_err(|e| {
3341 SofError::ParquetConversionError(format!(
3342 "Failed to create RecordBatch for rows {}-{}: {}",
3343 row_offset, batch_end, e
3344 ))
3345 })?;
3346
3347 // Write batch
3348 writer.write(&batch).map_err(|e| {
3349 SofError::ParquetConversionError(format!(
3350 "Failed to write RecordBatch for rows {}-{}: {}",
3351 row_offset, batch_end, e
3352 ))
3353 })?;
3354
3355 row_offset = batch_end;
3356 }
3357
3358 writer.close().map_err(|e| {
3359 SofError::ParquetConversionError(format!("Failed to close Parquet writer: {}", e))
3360 })?;
3361
3362 Ok(buffer)
3363}
3364
3365/// Format Parquet data with automatic file splitting when size exceeds limit
3366pub fn format_parquet_multi_file(
3367 result: ProcessedResult,
3368 options: Option<&ParquetOptions>,
3369 max_file_size_bytes: usize,
3370) -> Result<Vec<Vec<u8>>, SofError> {
3371 use arrow::record_batch::RecordBatch;
3372 use parquet::arrow::ArrowWriter;
3373 use parquet::basic::Compression;
3374 use parquet::file::properties::WriterProperties;
3375 use std::io::Cursor;
3376
3377 // Create Arrow schema from columns and sample data
3378 let schema = parquet_schema::create_arrow_schema(&result.columns, &result.rows)?;
3379 let schema_ref = std::sync::Arc::new(schema.clone());
3380
3381 // Get configuration from options or use defaults
3382 let parquet_opts = options.cloned().unwrap_or_default();
3383
3384 // Calculate optimal batch size
3385 let target_row_group_size_bytes = (parquet_opts.row_group_size_mb as usize) * 1024 * 1024;
3386 let target_page_size_bytes = (parquet_opts.page_size_kb as usize) * 1024;
3387 const TARGET_ROWS_PER_BATCH: usize = 100_000;
3388 const MAX_ROWS_PER_BATCH: usize = 500_000;
3389
3390 // Estimate average row size
3391 let sample_size = std::cmp::min(100, result.rows.len());
3392 let mut estimated_row_size = 100;
3393
3394 if sample_size > 0 {
3395 let sample_json_size: usize = result.rows[..sample_size]
3396 .iter()
3397 .map(|row| {
3398 row.values
3399 .iter()
3400 .filter_map(|v| v.as_ref())
3401 .map(|v| v.to_string().len())
3402 .sum::<usize>()
3403 })
3404 .sum();
3405 estimated_row_size = (sample_json_size / sample_size).max(50);
3406 }
3407
3408 let optimal_batch_size = (target_row_group_size_bytes / estimated_row_size)
3409 .clamp(TARGET_ROWS_PER_BATCH, MAX_ROWS_PER_BATCH);
3410
3411 // Parse compression algorithm
3412 use parquet::basic::BrotliLevel;
3413 use parquet::basic::GzipLevel;
3414 use parquet::basic::ZstdLevel;
3415
3416 let compression = match parquet_opts.compression.as_str() {
3417 "none" => Compression::UNCOMPRESSED,
3418 "gzip" => Compression::GZIP(GzipLevel::default()),
3419 "lz4" => Compression::LZ4,
3420 "brotli" => Compression::BROTLI(BrotliLevel::default()),
3421 "zstd" => Compression::ZSTD(ZstdLevel::default()),
3422 _ => Compression::SNAPPY,
3423 };
3424
3425 // Set up writer properties
3426 let props = WriterProperties::builder()
3427 .set_compression(compression)
3428 .set_max_row_group_size(target_row_group_size_bytes)
3429 .set_data_page_row_count_limit(20_000)
3430 .set_data_page_size_limit(target_page_size_bytes)
3431 .set_write_batch_size(8192)
3432 .build();
3433
3434 let mut file_buffers = Vec::new();
3435 let mut current_buffer = Vec::new();
3436 let mut current_cursor = Cursor::new(&mut current_buffer);
3437 let mut current_writer =
3438 ArrowWriter::try_new(&mut current_cursor, schema_ref.clone(), Some(props.clone()))
3439 .map_err(|e| {
3440 SofError::ParquetConversionError(format!("Failed to create Parquet writer: {}", e))
3441 })?;
3442
3443 let mut row_offset = 0;
3444 let mut _current_file_rows = 0;
3445
3446 while row_offset < result.rows.len() {
3447 let batch_end = (row_offset + optimal_batch_size).min(result.rows.len());
3448 let batch_rows = &result.rows[row_offset..batch_end];
3449
3450 // Convert batch to Arrow arrays
3451 let batch_arrays =
3452 parquet_schema::process_to_arrow_arrays(&schema, &result.columns, batch_rows)?;
3453
3454 // Create RecordBatch
3455 let batch = RecordBatch::try_new(schema_ref.clone(), batch_arrays).map_err(|e| {
3456 SofError::ParquetConversionError(format!(
3457 "Failed to create RecordBatch for rows {}-{}: {}",
3458 row_offset, batch_end, e
3459 ))
3460 })?;
3461
3462 // Write batch
3463 current_writer.write(&batch).map_err(|e| {
3464 SofError::ParquetConversionError(format!(
3465 "Failed to write RecordBatch for rows {}-{}: {}",
3466 row_offset, batch_end, e
3467 ))
3468 })?;
3469
3470 _current_file_rows += batch_end - row_offset;
3471 row_offset = batch_end;
3472
3473 // Check if we should start a new file
3474 // Get actual size of current buffer by flushing the writer
3475 let current_size = current_writer.bytes_written();
3476
3477 if current_size >= max_file_size_bytes && row_offset < result.rows.len() {
3478 // Close current file
3479 current_writer.close().map_err(|e| {
3480 SofError::ParquetConversionError(format!("Failed to close Parquet writer: {}", e))
3481 })?;
3482
3483 // Save the buffer
3484 file_buffers.push(current_buffer);
3485
3486 // Start new file
3487 current_buffer = Vec::new();
3488 current_cursor = Cursor::new(&mut current_buffer);
3489 current_writer =
3490 ArrowWriter::try_new(&mut current_cursor, schema_ref.clone(), Some(props.clone()))
3491 .map_err(|e| {
3492 SofError::ParquetConversionError(format!(
3493 "Failed to create new Parquet writer: {}",
3494 e
3495 ))
3496 })?;
3497 _current_file_rows = 0;
3498 }
3499 }
3500
3501 // Close the final writer
3502 current_writer.close().map_err(|e| {
3503 SofError::ParquetConversionError(format!("Failed to close final Parquet writer: {}", e))
3504 })?;
3505
3506 file_buffers.push(current_buffer);
3507
3508 Ok(file_buffers)
3509}