helios_sof/lib.rs
1//! # SQL-on-FHIR Implementation
2//!
3//! This crate provides a complete implementation of the [SQL-on-FHIR
4//! specification](https://sql-on-fhir.org/ig/latest),
5//! enabling the transformation of FHIR resources into tabular data using declarative
6//! ViewDefinitions. It supports all major FHIR versions (R4, R4B, R5, R6) through
7//! a version-agnostic abstraction layer.
8
9//!
10//! There are three consumers of this crate:
11//! - [sof_cli](../sof_cli/index.html) - A command-line interface for the SQL-on-FHIR implementation,
12//! allowing users to execute ViewDefinition transformations on FHIR Bundle resources
13//! and output the results in various formats.
14//! - [sof_server](../sof_server/index.html) - A stateless HTTP server implementation for the SQL-on-FHIR specification,
15//! enabling HTTP-based access to ViewDefinition transformation capabilities.
16//! - [hfs](../hfs/index.html) - The full featured Helios FHIR Server.
17//!
18//! ## Architecture
19//!
20//! The SOF crate is organized around these key components:
21//! - **Version-agnostic enums** ([`SofViewDefinition`], [`SofBundle`]): Multi-version containers
22//! - **Processing engine** ([`run_view_definition`]): Core transformation logic
23//! - **Output formats** ([`ContentType`]): Support for CSV, JSON, NDJSON, and Parquet
24//! - **Trait abstractions** ([`ViewDefinitionTrait`], [`BundleTrait`]): Version independence
25//!
26//! ## Key Features
27//!
28//! - **Multi-version FHIR support**: Works with R4, R4B, R5, and R6 resources
29//! - **FHIRPath evaluation**: Complex path expressions for data extraction
30//! - **forEach iteration**: Supports flattening of nested FHIR structures
31//! - **unionAll operations**: Combines multiple select statements
32//! - **Collection handling**: Proper array serialization for multi-valued fields
33//! - **Output formats**: CSV (with/without headers), JSON, NDJSON, Parquet support
34//!
35//! ## Usage Example
36//!
37//! ```rust
38//! # #[cfg(not(target_os = "windows"))]
39//! # {
40//! use helios_sof::{SofViewDefinition, SofBundle, ContentType, run_view_definition};
41//! use helios_fhir::FhirVersion;
42//!
43//! # #[cfg(feature = "R4")]
44//! # {
45//! // Parse a ViewDefinition and Bundle from JSON
46//! let view_definition_json = r#"{
47//! "resourceType": "ViewDefinition",
48//! "status": "active",
49//! "resource": "Patient",
50//! "select": [{
51//! "column": [{
52//! "name": "id",
53//! "path": "id"
54//! }, {
55//! "name": "name",
56//! "path": "name.family"
57//! }]
58//! }]
59//! }"#;
60//!
61//! let bundle_json = r#"{
62//! "resourceType": "Bundle",
63//! "type": "collection",
64//! "entry": [{
65//! "resource": {
66//! "resourceType": "Patient",
67//! "id": "example",
68//! "name": [{
69//! "family": "Doe",
70//! "given": ["John"]
71//! }]
72//! }
73//! }]
74//! }"#;
75//!
76//! let view_definition: helios_fhir::r4::ViewDefinition = serde_json::from_str(view_definition_json)?;
77//! let bundle: helios_fhir::r4::Bundle = serde_json::from_str(bundle_json)?;
78//!
79//! // Wrap in version-agnostic containers
80//! let sof_view = SofViewDefinition::R4(view_definition);
81//! let sof_bundle = SofBundle::R4(bundle);
82//!
83//! // Transform to CSV with headers
84//! let csv_output = run_view_definition(
85//! sof_view,
86//! sof_bundle,
87//! ContentType::CsvWithHeader
88//! )?;
89//!
90//! // Check the CSV output
91//! let csv_string = String::from_utf8(csv_output)?;
92//! assert!(csv_string.contains("id,name"));
93//! // CSV values are quoted
94//! assert!(csv_string.contains("example") && csv_string.contains("Doe"));
95//! # }
96//! # }
97//! # Ok::<(), Box<dyn std::error::Error>>(())
98//! ```
99//!
100//! ## Advanced Features
101//!
102//! ### forEach Iteration
103//!
104//! ViewDefinitions can iterate over collections using `forEach` and `forEachOrNull`:
105//!
106//! ```json
107//! {
108//! "select": [{
109//! "forEach": "name",
110//! "column": [{
111//! "name": "family_name",
112//! "path": "family"
113//! }]
114//! }]
115//! }
116//! ```
117//!
118//! ### Constants and Variables
119//!
120//! Define reusable values in ViewDefinitions:
121//!
122//! ```json
123//! {
124//! "constant": [{
125//! "name": "system",
126//! "valueString": "http://loinc.org"
127//! }],
128//! "select": [{
129//! "where": [{
130//! "path": "code.coding.system = %system"
131//! }]
132//! }]
133//! }
134//! ```
135//!
136//! ### Where Clauses
137//!
138//! Filter resources using FHIRPath expressions:
139//!
140//! ```json
141//! {
142//! "where": [{
143//! "path": "active = true"
144//! }, {
145//! "path": "birthDate.exists()"
146//! }]
147//! }
148//! ```
149//!
150//! ## Error Handling
151//!
152//! The crate provides comprehensive error handling through [`SofError`]:
153//!
154//! ```rust,no_run
155//! use helios_sof::{SofError, SofViewDefinition, SofBundle, ContentType, run_view_definition};
156//!
157//! # let view = SofViewDefinition::R4(helios_fhir::r4::ViewDefinition::default());
158//! # let bundle = SofBundle::R4(helios_fhir::r4::Bundle::default());
159//! # let content_type = ContentType::Json;
160//! match run_view_definition(view, bundle, content_type) {
161//! Ok(output) => {
162//! // Process successful transformation
163//! },
164//! Err(SofError::InvalidViewDefinition(msg)) => {
165//! eprintln!("ViewDefinition validation failed: {}", msg);
166//! },
167//! Err(SofError::FhirPathError(msg)) => {
168//! eprintln!("FHIRPath evaluation failed: {}", msg);
169//! },
170//! Err(e) => {
171//! eprintln!("Other error: {}", e);
172//! }
173//! }
174//! ```
175//! ## Feature Flags
176//!
177//! Enable support for specific FHIR versions:
178//! - `R4`: FHIR 4.0.1 support
179//! - `R4B`: FHIR 4.3.0 support
180//! - `R5`: FHIR 5.0.0 support
181//! - `R6`: FHIR 6.0.0 support
182
183pub mod data_source;
184pub mod parquet_schema;
185pub mod traits;
186
187use chrono::{DateTime, Utc};
188use helios_fhirpath::{EvaluationContext, EvaluationResult, evaluate_expression};
189use rayon::prelude::*;
190use serde::{Deserialize, Serialize};
191use std::collections::HashMap;
192use thiserror::Error;
193use traits::*;
194
195// Re-export commonly used types and traits for easier access
196pub use helios_fhir::FhirVersion;
197pub use traits::{BundleTrait, ResourceTrait, ViewDefinitionTrait};
198
199/// Multi-version ViewDefinition container supporting version-agnostic operations.
200///
201/// This enum provides a unified interface for working with ViewDefinition resources
202/// across different FHIR specification versions. It enables applications to handle
203/// multiple FHIR versions simultaneously while maintaining type safety.
204///
205/// # Supported Versions
206///
207/// - **R4**: FHIR 4.0.1 ViewDefinition (normative)
208/// - **R4B**: FHIR 4.3.0 ViewDefinition (ballot)
209/// - **R5**: FHIR 5.0.0 ViewDefinition (ballot)
210/// - **R6**: FHIR 6.0.0 ViewDefinition (draft)
211///
212/// # Examples
213///
214/// ```rust
215/// use helios_sof::{SofViewDefinition, ContentType};
216/// # #[cfg(feature = "R4")]
217/// use helios_fhir::r4::ViewDefinition;
218///
219/// # #[cfg(feature = "R4")]
220/// # {
221/// // Parse from JSON
222/// let json = r#"{
223/// "resourceType": "ViewDefinition",
224/// "resource": "Patient",
225/// "select": [{
226/// "column": [{
227/// "name": "id",
228/// "path": "id"
229/// }]
230/// }]
231/// }"#;
232///
233/// let view_def: ViewDefinition = serde_json::from_str(json)?;
234/// let sof_view = SofViewDefinition::R4(view_def);
235///
236/// // Check version
237/// assert_eq!(sof_view.version(), helios_fhir::FhirVersion::R4);
238/// # }
239/// # Ok::<(), Box<dyn std::error::Error>>(())
240/// ```
241#[derive(Debug, Clone)]
242pub enum SofViewDefinition {
243 #[cfg(feature = "R4")]
244 R4(helios_fhir::r4::ViewDefinition),
245 #[cfg(feature = "R4B")]
246 R4B(helios_fhir::r4b::ViewDefinition),
247 #[cfg(feature = "R5")]
248 R5(helios_fhir::r5::ViewDefinition),
249 #[cfg(feature = "R6")]
250 R6(helios_fhir::r6::ViewDefinition),
251}
252
253impl SofViewDefinition {
254 /// Returns the FHIR specification version of this ViewDefinition.
255 ///
256 /// This method provides version detection for multi-version applications,
257 /// enabling version-specific processing logic and compatibility checks.
258 ///
259 /// # Returns
260 ///
261 /// The `FhirVersion` enum variant corresponding to this ViewDefinition's specification.
262 ///
263 /// # Examples
264 ///
265 /// ```rust
266 /// use helios_sof::SofViewDefinition;
267 /// use helios_fhir::FhirVersion;
268 ///
269 /// # #[cfg(feature = "R5")]
270 /// # {
271 /// # let view_def = helios_fhir::r5::ViewDefinition::default();
272 /// let sof_view = SofViewDefinition::R5(view_def);
273 /// assert_eq!(sof_view.version(), helios_fhir::FhirVersion::R5);
274 /// # }
275 /// ```
276 pub fn version(&self) -> helios_fhir::FhirVersion {
277 match self {
278 #[cfg(feature = "R4")]
279 SofViewDefinition::R4(_) => helios_fhir::FhirVersion::R4,
280 #[cfg(feature = "R4B")]
281 SofViewDefinition::R4B(_) => helios_fhir::FhirVersion::R4B,
282 #[cfg(feature = "R5")]
283 SofViewDefinition::R5(_) => helios_fhir::FhirVersion::R5,
284 #[cfg(feature = "R6")]
285 SofViewDefinition::R6(_) => helios_fhir::FhirVersion::R6,
286 }
287 }
288}
289
290/// Multi-version Bundle container supporting version-agnostic operations.
291///
292/// This enum provides a unified interface for working with FHIR Bundle resources
293/// across different FHIR specification versions. Bundles contain the actual FHIR
294/// resources that will be processed by ViewDefinitions.
295///
296/// # Supported Versions
297///
298/// - **R4**: FHIR 4.0.1 Bundle (normative)
299/// - **R4B**: FHIR 4.3.0 Bundle (ballot)
300/// - **R5**: FHIR 5.0.0 Bundle (ballot)
301/// - **R6**: FHIR 6.0.0 Bundle (draft)
302///
303/// # Examples
304///
305/// ```rust
306/// # #[cfg(not(target_os = "windows"))]
307/// # {
308/// use helios_sof::SofBundle;
309/// # #[cfg(feature = "R4")]
310/// use helios_fhir::r4::Bundle;
311///
312/// # #[cfg(feature = "R4")]
313/// # {
314/// // Parse from JSON
315/// let json = r#"{
316/// "resourceType": "Bundle",
317/// "type": "collection",
318/// "entry": [{
319/// "resource": {
320/// "resourceType": "Patient",
321/// "id": "example"
322/// }
323/// }]
324/// }"#;
325///
326/// let bundle: Bundle = serde_json::from_str(json)?;
327/// let sof_bundle = SofBundle::R4(bundle);
328///
329/// // Check version compatibility
330/// assert_eq!(sof_bundle.version(), helios_fhir::FhirVersion::R4);
331/// # }
332/// # }
333/// # Ok::<(), Box<dyn std::error::Error>>(())
334/// ```
335#[derive(Debug, Clone)]
336pub enum SofBundle {
337 #[cfg(feature = "R4")]
338 R4(helios_fhir::r4::Bundle),
339 #[cfg(feature = "R4B")]
340 R4B(helios_fhir::r4b::Bundle),
341 #[cfg(feature = "R5")]
342 R5(helios_fhir::r5::Bundle),
343 #[cfg(feature = "R6")]
344 R6(helios_fhir::r6::Bundle),
345}
346
347impl SofBundle {
348 /// Returns the FHIR specification version of this Bundle.
349 ///
350 /// This method provides version detection for multi-version applications,
351 /// ensuring that ViewDefinitions and Bundles use compatible FHIR versions.
352 ///
353 /// # Returns
354 ///
355 /// The `FhirVersion` enum variant corresponding to this Bundle's specification.
356 ///
357 /// # Examples
358 ///
359 /// ```rust
360 /// use helios_sof::SofBundle;
361 /// use helios_fhir::FhirVersion;
362 ///
363 /// # #[cfg(feature = "R4")]
364 /// # {
365 /// # let bundle = helios_fhir::r4::Bundle::default();
366 /// let sof_bundle = SofBundle::R4(bundle);
367 /// assert_eq!(sof_bundle.version(), helios_fhir::FhirVersion::R4);
368 /// # }
369 /// ```
370 pub fn version(&self) -> helios_fhir::FhirVersion {
371 match self {
372 #[cfg(feature = "R4")]
373 SofBundle::R4(_) => helios_fhir::FhirVersion::R4,
374 #[cfg(feature = "R4B")]
375 SofBundle::R4B(_) => helios_fhir::FhirVersion::R4B,
376 #[cfg(feature = "R5")]
377 SofBundle::R5(_) => helios_fhir::FhirVersion::R5,
378 #[cfg(feature = "R6")]
379 SofBundle::R6(_) => helios_fhir::FhirVersion::R6,
380 }
381 }
382}
383
384/// Multi-version CapabilityStatement container supporting version-agnostic operations.
385///
386/// This enum provides a unified interface for working with CapabilityStatement resources
387/// across different FHIR specification versions. It enables applications to handle
388/// multiple FHIR versions simultaneously while maintaining type safety.
389///
390/// # Supported Versions
391///
392/// - **R4**: FHIR 4.0.1 CapabilityStatement (normative)
393/// - **R4B**: FHIR 4.3.0 CapabilityStatement (ballot)
394/// - **R5**: FHIR 5.0.0 CapabilityStatement (ballot)
395/// - **R6**: FHIR 6.0.0 CapabilityStatement (draft)
396#[derive(Debug, Clone, Serialize, Deserialize)]
397#[serde(untagged)]
398pub enum SofCapabilityStatement {
399 #[cfg(feature = "R4")]
400 R4(helios_fhir::r4::CapabilityStatement),
401 #[cfg(feature = "R4B")]
402 R4B(helios_fhir::r4b::CapabilityStatement),
403 #[cfg(feature = "R5")]
404 R5(helios_fhir::r5::CapabilityStatement),
405 #[cfg(feature = "R6")]
406 R6(helios_fhir::r6::CapabilityStatement),
407}
408
409impl SofCapabilityStatement {
410 /// Returns the FHIR specification version of this CapabilityStatement.
411 pub fn version(&self) -> helios_fhir::FhirVersion {
412 match self {
413 #[cfg(feature = "R4")]
414 SofCapabilityStatement::R4(_) => helios_fhir::FhirVersion::R4,
415 #[cfg(feature = "R4B")]
416 SofCapabilityStatement::R4B(_) => helios_fhir::FhirVersion::R4B,
417 #[cfg(feature = "R5")]
418 SofCapabilityStatement::R5(_) => helios_fhir::FhirVersion::R5,
419 #[cfg(feature = "R6")]
420 SofCapabilityStatement::R6(_) => helios_fhir::FhirVersion::R6,
421 }
422 }
423}
424
425/// Type alias for the version-independent Parameters container.
426///
427/// This alias provides backward compatibility while using the unified
428/// VersionIndependentParameters from the helios_fhir crate.
429pub type SofParameters = helios_fhir::VersionIndependentParameters;
430
431/// Comprehensive error type for SQL-on-FHIR operations.
432///
433/// This enum covers all possible error conditions that can occur during
434/// ViewDefinition processing, from validation failures to output formatting issues.
435/// Each variant provides specific context about the error to aid in debugging.
436///
437/// # Error Categories
438///
439/// - **Validation**: ViewDefinition structure and logic validation
440/// - **Evaluation**: FHIRPath expression evaluation failures
441/// - **I/O**: File and serialization operations
442/// - **Format**: Output format conversion issues
443///
444/// # Examples
445///
446/// ```rust,no_run
447/// use helios_sof::{SofError, SofViewDefinition, SofBundle, ContentType, run_view_definition};
448///
449/// # let view = SofViewDefinition::R4(helios_fhir::r4::ViewDefinition::default());
450/// # let bundle = SofBundle::R4(helios_fhir::r4::Bundle::default());
451/// # let content_type = ContentType::Json;
452/// match run_view_definition(view, bundle, content_type) {
453/// Ok(output) => {
454/// println!("Transformation successful");
455/// },
456/// Err(SofError::InvalidViewDefinition(msg)) => {
457/// eprintln!("ViewDefinition validation failed: {}", msg);
458/// },
459/// Err(SofError::FhirPathError(msg)) => {
460/// eprintln!("FHIRPath evaluation error: {}", msg);
461/// },
462/// Err(SofError::UnsupportedContentType(format)) => {
463/// eprintln!("Unsupported output format: {}", format);
464/// },
465/// Err(e) => {
466/// eprintln!("Other error: {}", e);
467/// }
468/// }
469/// ```
470#[derive(Debug, Error)]
471pub enum SofError {
472 /// ViewDefinition structure or logic validation failed.
473 ///
474 /// This error occurs when a ViewDefinition contains invalid or inconsistent
475 /// configuration, such as missing required fields, invalid FHIRPath expressions,
476 /// or incompatible select/unionAll structures.
477 #[error("Invalid ViewDefinition: {0}")]
478 InvalidViewDefinition(String),
479
480 /// FHIRPath expression evaluation failed.
481 ///
482 /// This error occurs when a FHIRPath expression in a ViewDefinition cannot
483 /// be evaluated, either due to syntax errors or runtime evaluation issues.
484 #[error("FHIRPath evaluation error: {0}")]
485 FhirPathError(String),
486
487 /// JSON serialization/deserialization failed.
488 ///
489 /// This error occurs when parsing input JSON or serializing output data fails,
490 /// typically due to malformed JSON or incompatible data structures.
491 #[error("Serialization error: {0}")]
492 SerializationError(#[from] serde_json::Error),
493
494 /// CSV processing failed.
495 ///
496 /// This error occurs during CSV output generation, such as when writing
497 /// headers or data rows to the CSV format.
498 #[error("CSV error: {0}")]
499 CsvError(#[from] csv::Error),
500
501 /// File I/O operation failed.
502 ///
503 /// This error occurs when reading input files or writing output files fails,
504 /// typically due to permission issues or missing files.
505 #[error("IO error: {0}")]
506 IoError(#[from] std::io::Error),
507
508 /// Unsupported output content type requested.
509 ///
510 /// This error occurs when an invalid or unimplemented content type is
511 /// specified for output formatting.
512 #[error("Unsupported content type: {0}")]
513 UnsupportedContentType(String),
514
515 /// CSV writer internal error.
516 ///
517 /// This error occurs when the CSV writer encounters an internal issue
518 /// that prevents successful output generation.
519 #[error("CSV writer error: {0}")]
520 CsvWriterError(String),
521
522 /// Invalid source parameter value.
523 ///
524 /// This error occurs when the source parameter contains an invalid URL or path.
525 #[error("Invalid source: {0}")]
526 InvalidSource(String),
527
528 /// Source not found.
529 ///
530 /// This error occurs when the specified source file or URL cannot be found.
531 #[error("Source not found: {0}")]
532 SourceNotFound(String),
533
534 /// Failed to fetch data from source.
535 ///
536 /// This error occurs when fetching data from a remote source fails.
537 #[error("Failed to fetch source: {0}")]
538 SourceFetchError(String),
539
540 /// Failed to read source data.
541 ///
542 /// This error occurs when reading data from the source fails.
543 #[error("Failed to read source: {0}")]
544 SourceReadError(String),
545
546 /// Invalid content in source.
547 ///
548 /// This error occurs when the source content is not valid FHIR data.
549 #[error("Invalid source content: {0}")]
550 InvalidSourceContent(String),
551
552 /// Unsupported source protocol.
553 ///
554 /// This error occurs when the source URL uses an unsupported protocol.
555 #[error("Unsupported source protocol: {0}")]
556 UnsupportedSourceProtocol(String),
557
558 /// Parquet conversion error.
559 ///
560 /// This error occurs when converting data to Parquet format fails.
561 #[error("Parquet conversion error: {0}")]
562 ParquetConversionError(String),
563}
564
565/// Supported output content types for ViewDefinition transformations.
566///
567/// This enum defines the available output formats for transformed FHIR data.
568/// Each format has specific characteristics and use cases for different
569/// integration scenarios.
570///
571/// # Format Descriptions
572///
573/// - **CSV**: Comma-separated values without headers
574/// - **CSV with Headers**: Comma-separated values with column headers
575/// - **JSON**: Pretty-printed JSON array of objects
576/// - **NDJSON**: Newline-delimited JSON (one object per line)
577/// - **Parquet**: Apache Parquet columnar format (planned)
578///
579/// # Examples
580///
581/// ```rust
582/// use helios_sof::ContentType;
583///
584/// // Parse from string
585/// let csv_type = ContentType::from_string("text/csv")?;
586/// assert_eq!(csv_type, ContentType::CsvWithHeader); // Default includes headers
587///
588/// let json_type = ContentType::from_string("application/json")?;
589/// assert_eq!(json_type, ContentType::Json);
590///
591/// // CSV without headers
592/// let csv_no_headers = ContentType::from_string("text/csv;header=false")?;
593/// assert_eq!(csv_no_headers, ContentType::Csv);
594/// # Ok::<(), helios_sof::SofError>(())
595/// ```
596#[derive(Debug, Clone, Copy, PartialEq, Eq)]
597pub enum ContentType {
598 /// Comma-separated values format without headers
599 Csv,
600 /// Comma-separated values format with column headers
601 CsvWithHeader,
602 /// Pretty-printed JSON array format
603 Json,
604 /// Newline-delimited JSON format (NDJSON)
605 NdJson,
606 /// Apache Parquet columnar format (not yet implemented)
607 Parquet,
608}
609
610impl ContentType {
611 /// Parse a content type from its MIME type string representation.
612 ///
613 /// This method converts standard MIME type strings to the corresponding
614 /// ContentType enum variants. It supports the SQL-on-FHIR specification's
615 /// recommended content types.
616 ///
617 /// # Supported MIME Types
618 ///
619 /// - `"text/csv"` → [`ContentType::Csv`]
620 /// - `"text/csv"` → [`ContentType::CsvWithHeader`] (default: headers included)
621 /// - `"text/csv;header=true"` → [`ContentType::CsvWithHeader`]
622 /// - `"text/csv;header=false"` → [`ContentType::Csv`]
623 /// - `"application/json"` → [`ContentType::Json`]
624 /// - `"application/ndjson"` → [`ContentType::NdJson`]
625 /// - `"application/x-ndjson"` → [`ContentType::NdJson`]
626 /// - `"application/parquet"` → [`ContentType::Parquet`]
627 ///
628 /// # Arguments
629 ///
630 /// * `s` - The MIME type string to parse
631 ///
632 /// # Returns
633 ///
634 /// * `Ok(ContentType)` - Successfully parsed content type
635 /// * `Err(SofError::UnsupportedContentType)` - Unknown or unsupported MIME type
636 ///
637 /// # Examples
638 ///
639 /// ```rust
640 /// use helios_sof::ContentType;
641 ///
642 /// // Shortened format names
643 /// let csv = ContentType::from_string("csv")?;
644 /// assert_eq!(csv, ContentType::CsvWithHeader);
645 ///
646 /// let json = ContentType::from_string("json")?;
647 /// assert_eq!(json, ContentType::Json);
648 ///
649 /// let ndjson = ContentType::from_string("ndjson")?;
650 /// assert_eq!(ndjson, ContentType::NdJson);
651 ///
652 /// // Full MIME types still supported
653 /// let csv_mime = ContentType::from_string("text/csv")?;
654 /// assert_eq!(csv_mime, ContentType::CsvWithHeader);
655 ///
656 /// // CSV with headers explicitly
657 /// let csv_headers = ContentType::from_string("text/csv;header=true")?;
658 /// assert_eq!(csv_headers, ContentType::CsvWithHeader);
659 ///
660 /// // CSV without headers
661 /// let csv_no_headers = ContentType::from_string("text/csv;header=false")?;
662 /// assert_eq!(csv_no_headers, ContentType::Csv);
663 ///
664 /// // JSON format
665 /// let json_mime = ContentType::from_string("application/json")?;
666 /// assert_eq!(json_mime, ContentType::Json);
667 ///
668 /// // Error for unsupported type
669 /// assert!(ContentType::from_string("text/plain").is_err());
670 /// # Ok::<(), helios_sof::SofError>(())
671 /// ```
672 pub fn from_string(s: &str) -> Result<Self, SofError> {
673 match s {
674 // Shortened format names
675 "csv" => Ok(ContentType::CsvWithHeader),
676 "json" => Ok(ContentType::Json),
677 "ndjson" => Ok(ContentType::NdJson),
678 "parquet" => Ok(ContentType::Parquet),
679 // Full MIME types (for Accept header compatibility)
680 "text/csv;header=false" => Ok(ContentType::Csv),
681 "text/csv" | "text/csv;header=true" => Ok(ContentType::CsvWithHeader),
682 "application/json" => Ok(ContentType::Json),
683 "application/ndjson" | "application/x-ndjson" => Ok(ContentType::NdJson),
684 "application/parquet" => Ok(ContentType::Parquet),
685 _ => Err(SofError::UnsupportedContentType(s.to_string())),
686 }
687 }
688}
689
690/// Returns the FHIR version string for the newest enabled version.
691///
692/// This function provides the version string that should be used in CapabilityStatements
693/// and other FHIR resources that need to specify their version.
694pub fn get_fhir_version_string() -> &'static str {
695 let newest_version = get_newest_enabled_fhir_version();
696
697 match newest_version {
698 #[cfg(feature = "R4")]
699 helios_fhir::FhirVersion::R4 => "4.0.1",
700 #[cfg(feature = "R4B")]
701 helios_fhir::FhirVersion::R4B => "4.3.0",
702 #[cfg(feature = "R5")]
703 helios_fhir::FhirVersion::R5 => "5.0.0",
704 #[cfg(feature = "R6")]
705 helios_fhir::FhirVersion::R6 => "6.0.0",
706 }
707}
708
709/// Returns the newest FHIR version that is enabled at compile time.
710///
711/// This function uses compile-time feature detection to determine which FHIR
712/// version should be used when multiple versions are enabled. The priority order
713/// is: R6 > R5 > R4B > R4, where newer versions take precedence.
714///
715/// # Examples
716///
717/// ```rust
718/// use helios_sof::{get_newest_enabled_fhir_version, FhirVersion};
719///
720/// # #[cfg(any(feature = "R4", feature = "R4B", feature = "R5", feature = "R6"))]
721/// # {
722/// let version = get_newest_enabled_fhir_version();
723/// // If R5 and R4 are both enabled, this returns R5
724/// # }
725/// ```
726///
727/// # Panics
728///
729/// This function will panic at compile time if no FHIR version features are enabled.
730pub fn get_newest_enabled_fhir_version() -> helios_fhir::FhirVersion {
731 #[cfg(feature = "R6")]
732 return helios_fhir::FhirVersion::R6;
733
734 #[cfg(all(feature = "R5", not(feature = "R6")))]
735 return helios_fhir::FhirVersion::R5;
736
737 #[cfg(all(feature = "R4B", not(feature = "R5"), not(feature = "R6")))]
738 return helios_fhir::FhirVersion::R4B;
739
740 #[cfg(all(
741 feature = "R4",
742 not(feature = "R4B"),
743 not(feature = "R5"),
744 not(feature = "R6")
745 ))]
746 return helios_fhir::FhirVersion::R4;
747
748 #[cfg(not(any(feature = "R4", feature = "R4B", feature = "R5", feature = "R6")))]
749 panic!("At least one FHIR version feature must be enabled");
750}
751
752/// A single row of processed tabular data from ViewDefinition transformation.
753///
754/// This struct represents one row in the output table, containing values for
755/// each column defined in the ViewDefinition. Values are stored as optional
756/// JSON values to handle nullable fields and diverse FHIR data types.
757///
758/// # Structure
759///
760/// Each `ProcessedRow` contains a vector of optional JSON values, where:
761/// - `Some(value)` represents a non-null column value
762/// - `None` represents a null/missing column value
763/// - The order matches the column order in [`ProcessedResult::columns`]
764///
765/// # Examples
766///
767/// ```rust
768/// use helios_sof::ProcessedRow;
769/// use serde_json::Value;
770///
771/// let row = ProcessedRow {
772/// values: vec![
773/// Some(Value::String("patient-123".to_string())),
774/// Some(Value::String("Doe".to_string())),
775/// None, // Missing birth date
776/// Some(Value::Bool(true)),
777/// ]
778/// };
779/// ```
780#[derive(Debug, Clone, Serialize, Deserialize)]
781pub struct ProcessedRow {
782 /// Column values for this row, ordered according to ProcessedResult::columns
783 pub values: Vec<Option<serde_json::Value>>,
784}
785
786/// Complete result of ViewDefinition transformation containing columns and data rows.
787///
788/// This struct represents the tabular output from processing a ViewDefinition
789/// against a Bundle of FHIR resources. It contains both the column definitions
790/// and the actual data rows in a format ready for serialization to various
791/// output formats.
792///
793/// # Structure
794///
795/// - [`columns`](Self::columns): Ordered list of column names from the ViewDefinition
796/// - [`rows`](Self::rows): Data rows where each row contains values in column order
797///
798/// # Examples
799///
800/// ```rust
801/// use helios_sof::{ProcessedResult, ProcessedRow};
802/// use serde_json::Value;
803///
804/// let result = ProcessedResult {
805/// columns: vec![
806/// "patient_id".to_string(),
807/// "family_name".to_string(),
808/// "given_name".to_string(),
809/// ],
810/// rows: vec![
811/// ProcessedRow {
812/// values: vec![
813/// Some(Value::String("patient-1".to_string())),
814/// Some(Value::String("Smith".to_string())),
815/// Some(Value::String("John".to_string())),
816/// ]
817/// },
818/// ProcessedRow {
819/// values: vec![
820/// Some(Value::String("patient-2".to_string())),
821/// Some(Value::String("Doe".to_string())),
822/// None, // Missing given name
823/// ]
824/// },
825/// ]
826/// };
827///
828/// assert_eq!(result.columns.len(), 3);
829/// assert_eq!(result.rows.len(), 2);
830/// ```
831#[derive(Debug, Clone, Serialize, Deserialize)]
832pub struct ProcessedResult {
833 /// Ordered list of column names as defined in the ViewDefinition
834 pub columns: Vec<String>,
835 /// Data rows containing values for each column
836 pub rows: Vec<ProcessedRow>,
837}
838
839/// Execute a SQL-on-FHIR ViewDefinition transformation on a FHIR Bundle.
840///
841/// This is the main entry point for SQL-on-FHIR transformations. It processes
842/// a ViewDefinition against a Bundle of FHIR resources and produces output in
843/// the specified format. The function handles version compatibility, validation,
844/// FHIRPath evaluation, and output formatting.
845///
846/// # Arguments
847///
848/// * `view_definition` - The ViewDefinition containing transformation logic
849/// * `bundle` - The Bundle containing FHIR resources to process
850/// * `content_type` - The desired output format
851///
852/// # Returns
853///
854/// * `Ok(Vec<u8>)` - Formatted output bytes ready for writing to file or stdout
855/// * `Err(SofError)` - Detailed error information about what went wrong
856///
857/// # Validation
858///
859/// The function performs comprehensive validation:
860/// - FHIR version compatibility between ViewDefinition and Bundle
861/// - ViewDefinition structure and logic validation
862/// - FHIRPath expression syntax and evaluation
863/// - Output format compatibility
864///
865/// # Examples
866///
867/// ```rust
868/// use helios_sof::{SofViewDefinition, SofBundle, ContentType, run_view_definition};
869///
870/// # #[cfg(feature = "R4")]
871/// # {
872/// // Create a simple ViewDefinition
873/// let view_json = serde_json::json!({
874/// "resourceType": "ViewDefinition",
875/// "status": "active",
876/// "resource": "Patient",
877/// "select": [{
878/// "column": [{
879/// "name": "id",
880/// "path": "id"
881/// }]
882/// }]
883/// });
884/// let view_def: helios_fhir::r4::ViewDefinition = serde_json::from_value(view_json)?;
885///
886/// // Create a simple Bundle
887/// let bundle_json = serde_json::json!({
888/// "resourceType": "Bundle",
889/// "type": "collection",
890/// "entry": []
891/// });
892/// let bundle: helios_fhir::r4::Bundle = serde_json::from_value(bundle_json)?;
893///
894/// let sof_view = SofViewDefinition::R4(view_def);
895/// let sof_bundle = SofBundle::R4(bundle);
896///
897/// // Generate CSV with headers
898/// let csv_output = run_view_definition(
899/// sof_view,
900/// sof_bundle,
901/// ContentType::CsvWithHeader
902/// )?;
903///
904/// // Write to file or stdout
905/// std::fs::write("output.csv", csv_output)?;
906/// # }
907/// # Ok::<(), Box<dyn std::error::Error>>(())
908/// ```
909///
910/// # Error Handling
911///
912/// Common error scenarios:
913///
914/// ```rust,no_run
915/// use helios_sof::{SofError, SofViewDefinition, SofBundle, ContentType, run_view_definition};
916///
917/// # let view = SofViewDefinition::R4(helios_fhir::r4::ViewDefinition::default());
918/// # let bundle = SofBundle::R4(helios_fhir::r4::Bundle::default());
919/// # let content_type = ContentType::Json;
920/// match run_view_definition(view, bundle, content_type) {
921/// Ok(output) => {
922/// println!("Success: {} bytes generated", output.len());
923/// },
924/// Err(SofError::InvalidViewDefinition(msg)) => {
925/// eprintln!("ViewDefinition error: {}", msg);
926/// },
927/// Err(SofError::FhirPathError(msg)) => {
928/// eprintln!("FHIRPath error: {}", msg);
929/// },
930/// Err(e) => {
931/// eprintln!("Other error: {}", e);
932/// }
933/// }
934/// ```
935pub fn run_view_definition(
936 view_definition: SofViewDefinition,
937 bundle: SofBundle,
938 content_type: ContentType,
939) -> Result<Vec<u8>, SofError> {
940 run_view_definition_with_options(view_definition, bundle, content_type, RunOptions::default())
941}
942
943/// Configuration options for Parquet file generation.
944#[derive(Debug, Clone)]
945pub struct ParquetOptions {
946 /// Target row group size in MB (64-1024)
947 pub row_group_size_mb: u32,
948 /// Target page size in KB (64-8192)
949 pub page_size_kb: u32,
950 /// Compression algorithm (none, snappy, gzip, lz4, brotli, zstd)
951 pub compression: String,
952 /// Maximum file size in MB (splits output when exceeded)
953 pub max_file_size_mb: Option<u32>,
954}
955
956impl Default for ParquetOptions {
957 fn default() -> Self {
958 Self {
959 row_group_size_mb: 256,
960 page_size_kb: 1024,
961 compression: "snappy".to_string(),
962 max_file_size_mb: None,
963 }
964 }
965}
966
967/// Options for filtering and controlling ViewDefinition execution
968#[derive(Debug, Clone, Default)]
969pub struct RunOptions {
970 /// Filter resources modified after this time
971 pub since: Option<DateTime<Utc>>,
972 /// Limit the number of results
973 pub limit: Option<usize>,
974 /// Page number for pagination (1-based)
975 pub page: Option<usize>,
976 /// Parquet-specific configuration options
977 pub parquet_options: Option<ParquetOptions>,
978}
979
980/// Execute a ViewDefinition transformation with additional filtering options.
981///
982/// This function extends the basic `run_view_definition` with support for:
983/// - Filtering resources by modification time (`since`)
984/// - Limiting results (`limit`)
985/// - Pagination (`page`)
986///
987/// # Arguments
988///
989/// * `view_definition` - The ViewDefinition to execute
990/// * `bundle` - The Bundle containing resources to transform
991/// * `content_type` - Desired output format
992/// * `options` - Additional filtering and control options
993///
994/// # Returns
995///
996/// The transformed data in the requested format, with filtering applied.
997pub fn run_view_definition_with_options(
998 view_definition: SofViewDefinition,
999 bundle: SofBundle,
1000 content_type: ContentType,
1001 options: RunOptions,
1002) -> Result<Vec<u8>, SofError> {
1003 // Filter bundle resources by since parameter before processing
1004 let filtered_bundle = if let Some(since) = options.since {
1005 filter_bundle_by_since(bundle, since)?
1006 } else {
1007 bundle
1008 };
1009
1010 // Process the ViewDefinition to generate tabular data
1011 let processed_result = process_view_definition(view_definition, filtered_bundle)?;
1012
1013 // Apply pagination if needed
1014 let processed_result = if options.limit.is_some() || options.page.is_some() {
1015 apply_pagination_to_result(processed_result, options.limit, options.page)?
1016 } else {
1017 processed_result
1018 };
1019
1020 // Format the result according to the requested content type
1021 format_output(
1022 processed_result,
1023 content_type,
1024 options.parquet_options.as_ref(),
1025 )
1026}
1027
1028pub fn process_view_definition(
1029 view_definition: SofViewDefinition,
1030 bundle: SofBundle,
1031) -> Result<ProcessedResult, SofError> {
1032 // Ensure both resources use the same FHIR version
1033 if view_definition.version() != bundle.version() {
1034 return Err(SofError::InvalidViewDefinition(
1035 "ViewDefinition and Bundle must use the same FHIR version".to_string(),
1036 ));
1037 }
1038
1039 match (view_definition, bundle) {
1040 #[cfg(feature = "R4")]
1041 (SofViewDefinition::R4(vd), SofBundle::R4(bundle)) => {
1042 process_view_definition_generic(vd, bundle)
1043 }
1044 #[cfg(feature = "R4B")]
1045 (SofViewDefinition::R4B(vd), SofBundle::R4B(bundle)) => {
1046 process_view_definition_generic(vd, bundle)
1047 }
1048 #[cfg(feature = "R5")]
1049 (SofViewDefinition::R5(vd), SofBundle::R5(bundle)) => {
1050 process_view_definition_generic(vd, bundle)
1051 }
1052 #[cfg(feature = "R6")]
1053 (SofViewDefinition::R6(vd), SofBundle::R6(bundle)) => {
1054 process_view_definition_generic(vd, bundle)
1055 }
1056 // This case should never happen due to the version check above,
1057 // but is needed for exhaustive pattern matching when multiple features are enabled
1058 #[cfg(any(
1059 all(feature = "R4", any(feature = "R4B", feature = "R5", feature = "R6")),
1060 all(feature = "R4B", any(feature = "R5", feature = "R6")),
1061 all(feature = "R5", feature = "R6")
1062 ))]
1063 _ => {
1064 unreachable!("Version mismatch should have been caught by the version check above")
1065 }
1066 }
1067}
1068
1069// Generic version-agnostic constant extraction
1070fn extract_view_definition_constants<VD: ViewDefinitionTrait>(
1071 view_definition: &VD,
1072) -> Result<HashMap<String, EvaluationResult>, SofError> {
1073 let mut variables = HashMap::new();
1074
1075 if let Some(constants) = view_definition.constants() {
1076 for constant in constants {
1077 let name = constant
1078 .name()
1079 .ok_or_else(|| {
1080 SofError::InvalidViewDefinition("Constant name is required".to_string())
1081 })?
1082 .to_string();
1083
1084 let eval_result = constant.to_evaluation_result()?;
1085 // Constants are referenced with % prefix in FHIRPath expressions
1086 variables.insert(format!("%{}", name), eval_result);
1087 }
1088 }
1089
1090 Ok(variables)
1091}
1092
1093// Generic version-agnostic ViewDefinition processing
1094fn process_view_definition_generic<VD, B>(
1095 view_definition: VD,
1096 bundle: B,
1097) -> Result<ProcessedResult, SofError>
1098where
1099 VD: ViewDefinitionTrait,
1100 B: BundleTrait,
1101 B::Resource: ResourceTrait + Sync,
1102 VD::Select: Sync,
1103{
1104 validate_view_definition(&view_definition)?;
1105
1106 // Step 1: Extract constants/variables from ViewDefinition
1107 let variables = extract_view_definition_constants(&view_definition)?;
1108
1109 // Step 2: Filter resources by type and profile
1110 let target_resource_type = view_definition
1111 .resource()
1112 .ok_or_else(|| SofError::InvalidViewDefinition("Resource type is required".to_string()))?;
1113
1114 let filtered_resources = filter_resources(&bundle, target_resource_type)?;
1115
1116 // Step 3: Apply where clauses to filter resources
1117 let filtered_resources = apply_where_clauses(
1118 filtered_resources,
1119 view_definition.where_clauses(),
1120 &variables,
1121 )?;
1122
1123 // Step 4: Process all select clauses to generate rows with forEach support
1124 let select_clauses = view_definition.select().ok_or_else(|| {
1125 SofError::InvalidViewDefinition("At least one select clause is required".to_string())
1126 })?;
1127
1128 // Generate rows for each resource using the forEach-aware approach
1129 let (all_columns, rows) =
1130 generate_rows_from_selects(&filtered_resources, select_clauses, &variables)?;
1131
1132 Ok(ProcessedResult {
1133 columns: all_columns,
1134 rows,
1135 })
1136}
1137
1138// Generic version-agnostic validation
1139fn validate_view_definition<VD: ViewDefinitionTrait>(view_def: &VD) -> Result<(), SofError> {
1140 // Basic validation
1141 if view_def.resource().is_none_or(|s| s.is_empty()) {
1142 return Err(SofError::InvalidViewDefinition(
1143 "ViewDefinition must specify a resource type".to_string(),
1144 ));
1145 }
1146
1147 if view_def.select().is_none_or(|s| s.is_empty()) {
1148 return Err(SofError::InvalidViewDefinition(
1149 "ViewDefinition must have at least one select".to_string(),
1150 ));
1151 }
1152
1153 // Validate where clauses
1154 if let Some(where_clauses) = view_def.where_clauses() {
1155 validate_where_clauses(where_clauses)?;
1156 }
1157
1158 // Validate selects
1159 if let Some(selects) = view_def.select() {
1160 for select in selects {
1161 validate_select(select)?;
1162 }
1163 }
1164
1165 Ok(())
1166}
1167
1168// Generic where clause validation
1169fn validate_where_clauses<W: ViewDefinitionWhereTrait>(
1170 where_clauses: &[W],
1171) -> Result<(), SofError> {
1172 // Basic validation - just ensure paths are provided
1173 // Type checking will be done during actual evaluation
1174 for where_clause in where_clauses {
1175 if where_clause.path().is_none() {
1176 return Err(SofError::InvalidViewDefinition(
1177 "Where clause must have a path specified".to_string(),
1178 ));
1179 }
1180 }
1181 Ok(())
1182}
1183
1184// Generic helper - no longer needs to be version-specific
1185fn can_be_coerced_to_boolean(result: &EvaluationResult) -> bool {
1186 // Check if the result can be meaningfully used as a boolean in a where clause
1187 match result {
1188 // Boolean values are obviously OK
1189 EvaluationResult::Boolean(_, _) => true,
1190
1191 // Empty is OK (evaluates to false)
1192 EvaluationResult::Empty => true,
1193
1194 // Collections are OK - they evaluate based on whether they're empty or not
1195 EvaluationResult::Collection { .. } => true,
1196
1197 // Other types cannot be meaningfully coerced to boolean for where clauses
1198 // This includes: String, Integer, Decimal, Date, DateTime, Time, Quantity, Object
1199 _ => false,
1200 }
1201}
1202
1203// Generic select validation
1204fn validate_select<S: ViewDefinitionSelectTrait>(select: &S) -> Result<(), SofError> {
1205 validate_select_with_context(select, false)
1206}
1207
1208fn validate_select_with_context<S: ViewDefinitionSelectTrait>(
1209 select: &S,
1210 in_foreach_context: bool,
1211) -> Result<(), SofError>
1212where
1213 S::Select: ViewDefinitionSelectTrait,
1214{
1215 // Determine if we're entering a forEach context at this level
1216 let entering_foreach = select.for_each().is_some() || select.for_each_or_null().is_some();
1217 let current_foreach_context = in_foreach_context || entering_foreach;
1218
1219 // Validate collection attribute with the current forEach context
1220 if let Some(columns) = select.column() {
1221 for column in columns {
1222 if let Some(collection_value) = column.collection() {
1223 if !collection_value && !current_foreach_context {
1224 return Err(SofError::InvalidViewDefinition(
1225 "Column 'collection' attribute must be true when specified".to_string(),
1226 ));
1227 }
1228 }
1229 }
1230 }
1231
1232 // Validate unionAll column consistency
1233 if let Some(union_selects) = select.union_all() {
1234 validate_union_all_columns(union_selects)?;
1235 }
1236
1237 // Recursively validate nested selects
1238 if let Some(nested_selects) = select.select() {
1239 for nested_select in nested_selects {
1240 validate_select_with_context(nested_select, current_foreach_context)?;
1241 }
1242 }
1243
1244 // Validate unionAll selects with forEach context
1245 if let Some(union_selects) = select.union_all() {
1246 for union_select in union_selects {
1247 validate_select_with_context(union_select, current_foreach_context)?;
1248 }
1249 }
1250
1251 Ok(())
1252}
1253
1254// Generic union validation
1255fn validate_union_all_columns<S: ViewDefinitionSelectTrait>(
1256 union_selects: &[S],
1257) -> Result<(), SofError> {
1258 if union_selects.len() < 2 {
1259 return Ok(());
1260 }
1261
1262 // Get column names and order from first select
1263 let first_select = &union_selects[0];
1264 let first_columns = get_column_names(first_select)?;
1265
1266 // Validate all other selects have the same column names in the same order
1267 for (index, union_select) in union_selects.iter().enumerate().skip(1) {
1268 let current_columns = get_column_names(union_select)?;
1269
1270 if current_columns != first_columns {
1271 if current_columns.len() != first_columns.len()
1272 || !current_columns
1273 .iter()
1274 .all(|name| first_columns.contains(name))
1275 {
1276 return Err(SofError::InvalidViewDefinition(format!(
1277 "UnionAll branch {} has different column names than first branch",
1278 index
1279 )));
1280 } else {
1281 return Err(SofError::InvalidViewDefinition(format!(
1282 "UnionAll branch {} has columns in different order than first branch",
1283 index
1284 )));
1285 }
1286 }
1287 }
1288
1289 Ok(())
1290}
1291
1292// Generic column name extraction
1293fn get_column_names<S: ViewDefinitionSelectTrait>(select: &S) -> Result<Vec<String>, SofError> {
1294 let mut column_names = Vec::new();
1295
1296 // Collect direct column names
1297 if let Some(columns) = select.column() {
1298 for column in columns {
1299 if let Some(name) = column.name() {
1300 column_names.push(name.to_string());
1301 }
1302 }
1303 }
1304
1305 // If this select has unionAll but no direct columns, get columns from first unionAll branch
1306 if column_names.is_empty() {
1307 if let Some(union_selects) = select.union_all() {
1308 if !union_selects.is_empty() {
1309 return get_column_names(&union_selects[0]);
1310 }
1311 }
1312 }
1313
1314 Ok(column_names)
1315}
1316
1317// Generic resource filtering
1318fn filter_resources<'a, B: BundleTrait>(
1319 bundle: &'a B,
1320 resource_type: &str,
1321) -> Result<Vec<&'a B::Resource>, SofError> {
1322 Ok(bundle
1323 .entries()
1324 .into_iter()
1325 .filter(|resource| resource.resource_name() == resource_type)
1326 .collect())
1327}
1328
1329// Generic where clause application
1330fn apply_where_clauses<'a, R, W>(
1331 resources: Vec<&'a R>,
1332 where_clauses: Option<&[W]>,
1333 variables: &HashMap<String, EvaluationResult>,
1334) -> Result<Vec<&'a R>, SofError>
1335where
1336 R: ResourceTrait,
1337 W: ViewDefinitionWhereTrait,
1338{
1339 if let Some(wheres) = where_clauses {
1340 let mut filtered = Vec::new();
1341
1342 for resource in resources {
1343 let mut include_resource = true;
1344
1345 // All where clauses must evaluate to true for the resource to be included
1346 for where_clause in wheres {
1347 let fhir_resource = resource.to_fhir_resource();
1348 let mut context = EvaluationContext::new(vec![fhir_resource]);
1349
1350 // Add variables to the context
1351 for (name, value) in variables {
1352 context.set_variable_result(name, value.clone());
1353 }
1354
1355 let path = where_clause.path().ok_or_else(|| {
1356 SofError::InvalidViewDefinition("Where clause path is required".to_string())
1357 })?;
1358
1359 match evaluate_expression(path, &context) {
1360 Ok(result) => {
1361 // Check if the result can be meaningfully used as a boolean
1362 if !can_be_coerced_to_boolean(&result) {
1363 return Err(SofError::InvalidViewDefinition(format!(
1364 "Where clause path '{}' returns type '{}' which cannot be used as a boolean condition. \
1365 Where clauses must return boolean values, collections, or empty results.",
1366 path,
1367 result.type_name()
1368 )));
1369 }
1370
1371 // Check if result is truthy (non-empty and not false)
1372 if !is_truthy(&result) {
1373 include_resource = false;
1374 break;
1375 }
1376 }
1377 Err(e) => {
1378 return Err(SofError::FhirPathError(format!(
1379 "Error evaluating where clause '{}': {}",
1380 path, e
1381 )));
1382 }
1383 }
1384 }
1385
1386 if include_resource {
1387 filtered.push(resource);
1388 }
1389 }
1390
1391 Ok(filtered)
1392 } else {
1393 Ok(resources)
1394 }
1395}
1396
1397// Removed generate_rows_per_resource_r4 - replaced with new forEach-aware implementation
1398
1399// Removed generate_rows_with_for_each_r4 - replaced with new forEach-aware implementation
1400
1401// Helper functions for FHIRPath result processing
1402fn is_truthy(result: &EvaluationResult) -> bool {
1403 match result {
1404 EvaluationResult::Empty => false,
1405 EvaluationResult::Boolean(b, _) => *b,
1406 EvaluationResult::Collection { items, .. } => !items.is_empty(),
1407 _ => true, // Non-empty, non-false values are truthy
1408 }
1409}
1410
1411fn fhirpath_result_to_json_value_collection(result: EvaluationResult) -> Option<serde_json::Value> {
1412 match result {
1413 EvaluationResult::Empty => Some(serde_json::Value::Array(vec![])),
1414 EvaluationResult::Collection { items, .. } => {
1415 // Always return array for collection columns, even if empty
1416 let values: Vec<serde_json::Value> = items
1417 .into_iter()
1418 .filter_map(fhirpath_result_to_json_value)
1419 .collect();
1420 Some(serde_json::Value::Array(values))
1421 }
1422 // For non-collection results in collection columns, wrap in array
1423 single_result => {
1424 if let Some(json_val) = fhirpath_result_to_json_value(single_result) {
1425 Some(serde_json::Value::Array(vec![json_val]))
1426 } else {
1427 Some(serde_json::Value::Array(vec![]))
1428 }
1429 }
1430 }
1431}
1432
1433fn fhirpath_result_to_json_value(result: EvaluationResult) -> Option<serde_json::Value> {
1434 match result {
1435 EvaluationResult::Empty => None,
1436 EvaluationResult::Boolean(b, _) => Some(serde_json::Value::Bool(b)),
1437 EvaluationResult::Integer(i, _) => {
1438 Some(serde_json::Value::Number(serde_json::Number::from(i)))
1439 }
1440 EvaluationResult::Decimal(d, _) => {
1441 // Check if this Decimal represents a whole number
1442 if d.fract().is_zero() {
1443 // Convert to integer if no fractional part
1444 if let Ok(i) = d.to_string().parse::<i64>() {
1445 Some(serde_json::Value::Number(serde_json::Number::from(i)))
1446 } else {
1447 // Handle very large numbers as strings
1448 Some(serde_json::Value::String(d.to_string()))
1449 }
1450 } else {
1451 // Convert Decimal to a float for fractional numbers
1452 if let Ok(f) = d.to_string().parse::<f64>() {
1453 if let Some(num) = serde_json::Number::from_f64(f) {
1454 Some(serde_json::Value::Number(num))
1455 } else {
1456 Some(serde_json::Value::String(d.to_string()))
1457 }
1458 } else {
1459 Some(serde_json::Value::String(d.to_string()))
1460 }
1461 }
1462 }
1463 EvaluationResult::String(s, _) => Some(serde_json::Value::String(s)),
1464 EvaluationResult::Date(s, _) => Some(serde_json::Value::String(s)),
1465 EvaluationResult::DateTime(s, _) => {
1466 // Remove "@" prefix from datetime strings if present
1467 let cleaned = s.strip_prefix("@").unwrap_or(&s);
1468 Some(serde_json::Value::String(cleaned.to_string()))
1469 }
1470 EvaluationResult::Time(s, _) => {
1471 // Remove "@T" prefix from time strings if present
1472 let cleaned = s.strip_prefix("@T").unwrap_or(&s);
1473 Some(serde_json::Value::String(cleaned.to_string()))
1474 }
1475 EvaluationResult::Collection { items, .. } => {
1476 if items.len() == 1 {
1477 // Single item collection - unwrap to the item itself
1478 fhirpath_result_to_json_value(items.into_iter().next().unwrap())
1479 } else if items.is_empty() {
1480 None
1481 } else {
1482 // Multiple items - convert to array
1483 let values: Vec<serde_json::Value> = items
1484 .into_iter()
1485 .filter_map(fhirpath_result_to_json_value)
1486 .collect();
1487 Some(serde_json::Value::Array(values))
1488 }
1489 }
1490 EvaluationResult::Object { map, .. } => {
1491 let mut json_map = serde_json::Map::new();
1492 for (k, v) in map {
1493 if let Some(json_val) = fhirpath_result_to_json_value(v) {
1494 json_map.insert(k, json_val);
1495 }
1496 }
1497 Some(serde_json::Value::Object(json_map))
1498 }
1499 // Handle other result types as strings
1500 _ => Some(serde_json::Value::String(format!("{:?}", result))),
1501 }
1502}
1503
1504fn extract_iteration_items(result: EvaluationResult) -> Vec<EvaluationResult> {
1505 match result {
1506 EvaluationResult::Collection { items, .. } => items,
1507 EvaluationResult::Empty => Vec::new(),
1508 single_item => vec![single_item],
1509 }
1510}
1511
1512// Generic row generation functions
1513
1514fn generate_rows_from_selects<R, S>(
1515 resources: &[&R],
1516 selects: &[S],
1517 variables: &HashMap<String, EvaluationResult>,
1518) -> Result<(Vec<String>, Vec<ProcessedRow>), SofError>
1519where
1520 R: ResourceTrait + Sync,
1521 S: ViewDefinitionSelectTrait + Sync,
1522 S::Select: ViewDefinitionSelectTrait,
1523{
1524 // Process resources in parallel
1525 let resource_results: Result<Vec<_>, _> = resources
1526 .par_iter()
1527 .map(|resource| {
1528 // Each thread gets its own local column vector
1529 let mut local_columns = Vec::new();
1530 let resource_rows =
1531 generate_rows_for_resource(*resource, selects, &mut local_columns, variables)?;
1532 Ok::<(Vec<String>, Vec<ProcessedRow>), SofError>((local_columns, resource_rows))
1533 })
1534 .collect();
1535
1536 // Handle errors from parallel processing
1537 let resource_results = resource_results?;
1538
1539 // Merge columns from all threads (maintaining order is important)
1540 let mut final_columns = Vec::new();
1541 let mut all_rows = Vec::new();
1542
1543 for (local_columns, resource_rows) in resource_results {
1544 // Merge columns, avoiding duplicates
1545 for col in local_columns {
1546 if !final_columns.contains(&col) {
1547 final_columns.push(col);
1548 }
1549 }
1550 all_rows.extend(resource_rows);
1551 }
1552
1553 Ok((final_columns, all_rows))
1554}
1555
1556fn generate_rows_for_resource<R, S>(
1557 resource: &R,
1558 selects: &[S],
1559 all_columns: &mut Vec<String>,
1560 variables: &HashMap<String, EvaluationResult>,
1561) -> Result<Vec<ProcessedRow>, SofError>
1562where
1563 R: ResourceTrait,
1564 S: ViewDefinitionSelectTrait,
1565 S::Select: ViewDefinitionSelectTrait,
1566{
1567 let fhir_resource = resource.to_fhir_resource();
1568 let mut context = EvaluationContext::new(vec![fhir_resource]);
1569
1570 // Add variables to the context
1571 for (name, value) in variables {
1572 context.set_variable_result(name, value.clone());
1573 }
1574
1575 // Generate all possible row combinations for this resource
1576 let row_combinations = generate_row_combinations(&context, selects, all_columns, variables)?;
1577
1578 Ok(row_combinations)
1579}
1580
1581#[derive(Debug, Clone)]
1582struct RowCombination {
1583 values: Vec<Option<serde_json::Value>>,
1584}
1585
1586fn generate_row_combinations<S>(
1587 context: &EvaluationContext,
1588 selects: &[S],
1589 all_columns: &mut Vec<String>,
1590 variables: &HashMap<String, EvaluationResult>,
1591) -> Result<Vec<ProcessedRow>, SofError>
1592where
1593 S: ViewDefinitionSelectTrait,
1594 S::Select: ViewDefinitionSelectTrait,
1595{
1596 // First pass: collect all column names to ensure consistent ordering
1597 collect_all_columns(selects, all_columns)?;
1598
1599 // Second pass: generate all row combinations
1600 let mut row_combinations = vec![RowCombination {
1601 values: vec![None; all_columns.len()],
1602 }];
1603
1604 for select in selects {
1605 row_combinations =
1606 expand_select_combinations(context, select, &row_combinations, all_columns, variables)?;
1607 }
1608
1609 // Convert to ProcessedRow format
1610 Ok(row_combinations
1611 .into_iter()
1612 .map(|combo| ProcessedRow {
1613 values: combo.values,
1614 })
1615 .collect())
1616}
1617
1618fn collect_all_columns<S>(selects: &[S], all_columns: &mut Vec<String>) -> Result<(), SofError>
1619where
1620 S: ViewDefinitionSelectTrait,
1621{
1622 for select in selects {
1623 // Add columns from this select
1624 if let Some(columns) = select.column() {
1625 for col in columns {
1626 if let Some(name) = col.name() {
1627 if !all_columns.contains(&name.to_string()) {
1628 all_columns.push(name.to_string());
1629 }
1630 }
1631 }
1632 }
1633
1634 // Recursively collect from nested selects
1635 if let Some(nested_selects) = select.select() {
1636 collect_all_columns(nested_selects, all_columns)?;
1637 }
1638
1639 // Collect from unionAll
1640 if let Some(union_selects) = select.union_all() {
1641 collect_all_columns(union_selects, all_columns)?;
1642 }
1643 }
1644 Ok(())
1645}
1646
1647fn expand_select_combinations<S>(
1648 context: &EvaluationContext,
1649 select: &S,
1650 existing_combinations: &[RowCombination],
1651 all_columns: &[String],
1652 variables: &HashMap<String, EvaluationResult>,
1653) -> Result<Vec<RowCombination>, SofError>
1654where
1655 S: ViewDefinitionSelectTrait,
1656 S::Select: ViewDefinitionSelectTrait,
1657{
1658 // Handle forEach and forEachOrNull
1659 if let Some(for_each_path) = select.for_each() {
1660 return expand_for_each_combinations(
1661 context,
1662 select,
1663 existing_combinations,
1664 all_columns,
1665 for_each_path,
1666 false,
1667 variables,
1668 );
1669 }
1670
1671 if let Some(for_each_or_null_path) = select.for_each_or_null() {
1672 return expand_for_each_combinations(
1673 context,
1674 select,
1675 existing_combinations,
1676 all_columns,
1677 for_each_or_null_path,
1678 true,
1679 variables,
1680 );
1681 }
1682
1683 // Handle repeat directive for recursive traversal
1684 if let Some(repeat_paths) = select.repeat() {
1685 return expand_repeat_combinations(
1686 context,
1687 select,
1688 existing_combinations,
1689 all_columns,
1690 &repeat_paths,
1691 variables,
1692 );
1693 }
1694
1695 // Handle regular columns (no forEach)
1696 let mut new_combinations = Vec::new();
1697
1698 for existing_combo in existing_combinations {
1699 let mut new_combo = existing_combo.clone();
1700
1701 // Add values from this select's columns
1702 if let Some(columns) = select.column() {
1703 for col in columns {
1704 if let Some(col_name) = col.name() {
1705 if let Some(col_index) = all_columns.iter().position(|name| name == col_name) {
1706 let path = col.path().ok_or_else(|| {
1707 SofError::InvalidViewDefinition("Column path is required".to_string())
1708 })?;
1709
1710 match evaluate_expression(path, context) {
1711 Ok(result) => {
1712 // Check if this column is marked as a collection
1713 let is_collection = col.collection().unwrap_or(false);
1714
1715 new_combo.values[col_index] = if is_collection {
1716 fhirpath_result_to_json_value_collection(result)
1717 } else {
1718 fhirpath_result_to_json_value(result)
1719 };
1720 }
1721 Err(e) => {
1722 return Err(SofError::FhirPathError(format!(
1723 "Error evaluating column '{}' with path '{}': {}",
1724 col_name, path, e
1725 )));
1726 }
1727 }
1728 }
1729 }
1730 }
1731 }
1732
1733 new_combinations.push(new_combo);
1734 }
1735
1736 // Handle nested selects
1737 if let Some(nested_selects) = select.select() {
1738 for nested_select in nested_selects {
1739 new_combinations = expand_select_combinations(
1740 context,
1741 nested_select,
1742 &new_combinations,
1743 all_columns,
1744 variables,
1745 )?;
1746 }
1747 }
1748
1749 // Handle unionAll
1750 if let Some(union_selects) = select.union_all() {
1751 let mut union_combinations = Vec::new();
1752
1753 // Process each unionAll select independently, using the combinations that already have
1754 // values from this select's columns and nested selects
1755 for union_select in union_selects {
1756 let select_combinations = expand_select_combinations(
1757 context,
1758 union_select,
1759 &new_combinations,
1760 all_columns,
1761 variables,
1762 )?;
1763 union_combinations.extend(select_combinations);
1764 }
1765
1766 // unionAll replaces new_combinations with the union results
1767 // If no union results, this resource should be filtered out (no rows for this resource)
1768 new_combinations = union_combinations;
1769 }
1770
1771 Ok(new_combinations)
1772}
1773
1774fn expand_for_each_combinations<S>(
1775 context: &EvaluationContext,
1776 select: &S,
1777 existing_combinations: &[RowCombination],
1778 all_columns: &[String],
1779 for_each_path: &str,
1780 allow_null: bool,
1781 variables: &HashMap<String, EvaluationResult>,
1782) -> Result<Vec<RowCombination>, SofError>
1783where
1784 S: ViewDefinitionSelectTrait,
1785 S::Select: ViewDefinitionSelectTrait,
1786{
1787 // Evaluate the forEach expression to get iteration items
1788 let for_each_result = evaluate_expression(for_each_path, context).map_err(|e| {
1789 SofError::FhirPathError(format!(
1790 "Error evaluating forEach expression '{}': {}",
1791 for_each_path, e
1792 ))
1793 })?;
1794
1795 let iteration_items = extract_iteration_items(for_each_result);
1796
1797 if iteration_items.is_empty() {
1798 if allow_null {
1799 // forEachOrNull: generate null rows
1800 let mut new_combinations = Vec::new();
1801 for existing_combo in existing_combinations {
1802 let mut new_combo = existing_combo.clone();
1803
1804 // Set column values to null for this forEach scope
1805 if let Some(columns) = select.column() {
1806 for col in columns {
1807 if let Some(col_name) = col.name() {
1808 if let Some(col_index) =
1809 all_columns.iter().position(|name| name == col_name)
1810 {
1811 new_combo.values[col_index] = None;
1812 }
1813 }
1814 }
1815 }
1816
1817 new_combinations.push(new_combo);
1818 }
1819 return Ok(new_combinations);
1820 } else {
1821 // forEach with empty collection: no rows
1822 return Ok(Vec::new());
1823 }
1824 }
1825
1826 let mut new_combinations = Vec::new();
1827
1828 // For each iteration item, create new combinations
1829 for item in &iteration_items {
1830 // Create a new context with the iteration item
1831 let _item_context = create_iteration_context(item, variables);
1832
1833 for existing_combo in existing_combinations {
1834 let mut new_combo = existing_combo.clone();
1835
1836 // Evaluate columns in the context of the iteration item
1837 if let Some(columns) = select.column() {
1838 for col in columns {
1839 if let Some(col_name) = col.name() {
1840 if let Some(col_index) =
1841 all_columns.iter().position(|name| name == col_name)
1842 {
1843 let path = col.path().ok_or_else(|| {
1844 SofError::InvalidViewDefinition(
1845 "Column path is required".to_string(),
1846 )
1847 })?;
1848
1849 // Use the iteration item directly for path evaluation
1850 let result = if path == "$this" {
1851 // Special case: $this refers to the current iteration item
1852 item.clone()
1853 } else {
1854 // Evaluate the path on the iteration item
1855 evaluate_path_on_item(path, item, variables)?
1856 };
1857
1858 // Check if this column is marked as a collection
1859 let is_collection = col.collection().unwrap_or(false);
1860
1861 new_combo.values[col_index] = if is_collection {
1862 fhirpath_result_to_json_value_collection(result)
1863 } else {
1864 fhirpath_result_to_json_value(result)
1865 };
1866 }
1867 }
1868 }
1869 }
1870
1871 new_combinations.push(new_combo);
1872 }
1873 }
1874
1875 // Handle nested selects with the forEach context
1876 if let Some(nested_selects) = select.select() {
1877 let mut final_combinations = Vec::new();
1878
1879 for item in &iteration_items {
1880 let item_context = create_iteration_context(item, variables);
1881
1882 // For each iteration item, we need to start with the combinations that have
1883 // the correct column values for this forEach scope
1884 for existing_combo in existing_combinations {
1885 // Find the combination that corresponds to this iteration item
1886 // by looking at the values we set for columns in this forEach scope
1887 let mut base_combo = existing_combo.clone();
1888
1889 // Update the base combination with column values for this iteration item
1890 if let Some(columns) = select.column() {
1891 for col in columns {
1892 if let Some(col_name) = col.name() {
1893 if let Some(col_index) =
1894 all_columns.iter().position(|name| name == col_name)
1895 {
1896 let path = col.path().ok_or_else(|| {
1897 SofError::InvalidViewDefinition(
1898 "Column path is required".to_string(),
1899 )
1900 })?;
1901
1902 let result = if path == "$this" {
1903 item.clone()
1904 } else {
1905 evaluate_path_on_item(path, item, variables)?
1906 };
1907
1908 // Check if this column is marked as a collection
1909 let is_collection = col.collection().unwrap_or(false);
1910
1911 base_combo.values[col_index] = if is_collection {
1912 fhirpath_result_to_json_value_collection(result)
1913 } else {
1914 fhirpath_result_to_json_value(result)
1915 };
1916 }
1917 }
1918 }
1919 }
1920
1921 // Start with this base combination for nested processing
1922 let mut item_combinations = vec![base_combo];
1923
1924 // Process nested selects
1925 for nested_select in nested_selects {
1926 item_combinations = expand_select_combinations(
1927 &item_context,
1928 nested_select,
1929 &item_combinations,
1930 all_columns,
1931 variables,
1932 )?;
1933 }
1934
1935 final_combinations.extend(item_combinations);
1936 }
1937 }
1938
1939 new_combinations = final_combinations;
1940 }
1941
1942 // Handle unionAll within forEach context
1943 if let Some(union_selects) = select.union_all() {
1944 let mut union_combinations = Vec::new();
1945
1946 for item in &iteration_items {
1947 let item_context = create_iteration_context(item, variables);
1948
1949 // For each iteration item, process all unionAll selects
1950 for existing_combo in existing_combinations {
1951 let mut base_combo = existing_combo.clone();
1952
1953 // Update the base combination with column values for this iteration item
1954 if let Some(columns) = select.column() {
1955 for col in columns {
1956 if let Some(col_name) = col.name() {
1957 if let Some(col_index) =
1958 all_columns.iter().position(|name| name == col_name)
1959 {
1960 let path = col.path().ok_or_else(|| {
1961 SofError::InvalidViewDefinition(
1962 "Column path is required".to_string(),
1963 )
1964 })?;
1965
1966 let result = if path == "$this" {
1967 item.clone()
1968 } else {
1969 evaluate_path_on_item(path, item, variables)?
1970 };
1971
1972 // Check if this column is marked as a collection
1973 let is_collection = col.collection().unwrap_or(false);
1974
1975 base_combo.values[col_index] = if is_collection {
1976 fhirpath_result_to_json_value_collection(result)
1977 } else {
1978 fhirpath_result_to_json_value(result)
1979 };
1980 }
1981 }
1982 }
1983 }
1984
1985 // Also evaluate columns from nested selects and add them to base_combo
1986 if let Some(nested_selects) = select.select() {
1987 for nested_select in nested_selects {
1988 if let Some(nested_columns) = nested_select.column() {
1989 for col in nested_columns {
1990 if let Some(col_name) = col.name() {
1991 if let Some(col_index) =
1992 all_columns.iter().position(|name| name == col_name)
1993 {
1994 let path = col.path().ok_or_else(|| {
1995 SofError::InvalidViewDefinition(
1996 "Column path is required".to_string(),
1997 )
1998 })?;
1999
2000 let result = if path == "$this" {
2001 item.clone()
2002 } else {
2003 evaluate_path_on_item(path, item, variables)?
2004 };
2005
2006 // Check if this column is marked as a collection
2007 let is_collection = col.collection().unwrap_or(false);
2008
2009 base_combo.values[col_index] = if is_collection {
2010 fhirpath_result_to_json_value_collection(result)
2011 } else {
2012 fhirpath_result_to_json_value(result)
2013 };
2014 }
2015 }
2016 }
2017 }
2018 }
2019 }
2020
2021 // Process each unionAll select independently for this iteration item
2022 for union_select in union_selects {
2023 let mut select_combinations = vec![base_combo.clone()];
2024 select_combinations = expand_select_combinations(
2025 &item_context,
2026 union_select,
2027 &select_combinations,
2028 all_columns,
2029 variables,
2030 )?;
2031 union_combinations.extend(select_combinations);
2032 }
2033 }
2034 }
2035
2036 // unionAll replaces new_combinations with the union results
2037 // If no union results, filter out this resource (no rows for this resource)
2038 new_combinations = union_combinations;
2039 }
2040
2041 Ok(new_combinations)
2042}
2043
2044fn expand_repeat_combinations<S>(
2045 context: &EvaluationContext,
2046 select: &S,
2047 existing_combinations: &[RowCombination],
2048 all_columns: &[String],
2049 repeat_paths: &[&str],
2050 variables: &HashMap<String, EvaluationResult>,
2051) -> Result<Vec<RowCombination>, SofError>
2052where
2053 S: ViewDefinitionSelectTrait,
2054 S::Select: ViewDefinitionSelectTrait,
2055{
2056 // The repeat directive performs recursive traversal:
2057 // 1. For each repeat path, find child elements from the current context
2058 // 2. For each child element:
2059 // a. Evaluate columns in the child's context
2060 // b. Recursively process the child with the same repeat paths
2061 // 3. Union all results together
2062 //
2063 // Note: Unlike forEach, repeat does NOT process the current level's columns
2064 // - it ONLY processes elements found via the repeat paths
2065
2066 let mut all_combinations = Vec::new();
2067
2068 // Process each existing combination
2069 for existing_combo in existing_combinations {
2070 // Process each repeat path to find children to traverse
2071 for repeat_path in repeat_paths {
2072 // Evaluate the repeat path to get child elements
2073 let repeat_result = evaluate_expression(repeat_path, context).map_err(|e| {
2074 SofError::FhirPathError(format!(
2075 "Error evaluating repeat expression '{}': {}",
2076 repeat_path, e
2077 ))
2078 })?;
2079
2080 let child_items = extract_iteration_items(repeat_result);
2081
2082 // For each child item found via this repeat path
2083 for child_item in &child_items {
2084 // Create a combination for this child with current level's columns
2085 let mut child_combo = existing_combo.clone();
2086
2087 // Evaluate columns in the context of this child item
2088 if let Some(columns) = select.column() {
2089 for col in columns {
2090 if let Some(col_name) = col.name() {
2091 if let Some(col_index) =
2092 all_columns.iter().position(|name| name == col_name)
2093 {
2094 let path = col.path().ok_or_else(|| {
2095 SofError::InvalidViewDefinition(
2096 "Column path is required".to_string(),
2097 )
2098 })?;
2099
2100 // Evaluate the path on the child item
2101 let result = if path == "$this" {
2102 child_item.clone()
2103 } else {
2104 evaluate_path_on_item(path, child_item, variables)?
2105 };
2106
2107 let is_collection = col.collection().unwrap_or(false);
2108 child_combo.values[col_index] = if is_collection {
2109 fhirpath_result_to_json_value_collection(result)
2110 } else {
2111 fhirpath_result_to_json_value(result)
2112 };
2113 }
2114 }
2115 }
2116 }
2117
2118 // Create context for this child item
2119 let child_context = create_iteration_context(child_item, variables);
2120
2121 // Start with the child combination we just created
2122 let mut child_combinations = vec![child_combo.clone()];
2123
2124 // Process nested selects (like forEach/forEachOrNull) in the child's context
2125 if let Some(nested_selects) = select.select() {
2126 for nested_select in nested_selects {
2127 child_combinations = expand_select_combinations(
2128 &child_context,
2129 nested_select,
2130 &child_combinations,
2131 all_columns,
2132 variables,
2133 )?;
2134 }
2135 }
2136
2137 // Add the processed combinations to our results
2138 // (these may have been filtered by forEach, which is correct)
2139 all_combinations.extend(child_combinations);
2140
2141 // Now recursively process this child with the same repeat paths
2142 // IMPORTANT: Use the original child_combo, not the forEach-filtered results
2143 let recursive_combinations = expand_repeat_combinations(
2144 &child_context,
2145 select,
2146 &[child_combo],
2147 all_columns,
2148 repeat_paths,
2149 variables,
2150 )?;
2151
2152 all_combinations.extend(recursive_combinations);
2153 }
2154 }
2155 }
2156
2157 Ok(all_combinations)
2158}
2159
2160// Generic helper functions
2161fn evaluate_path_on_item(
2162 path: &str,
2163 item: &EvaluationResult,
2164 variables: &HashMap<String, EvaluationResult>,
2165) -> Result<EvaluationResult, SofError> {
2166 // Create a temporary context with the iteration item as the root resource
2167 let mut temp_context = match item {
2168 EvaluationResult::Object { .. } => {
2169 // Convert the iteration item to a resource-like structure for FHIRPath evaluation
2170 // For simplicity, we'll create a basic context where the item is available for evaluation
2171 let mut context = EvaluationContext::new(vec![]);
2172 context.this = Some(item.clone());
2173 context
2174 }
2175 _ => EvaluationContext::new(vec![]),
2176 };
2177
2178 // Add variables to the temporary context
2179 for (name, value) in variables {
2180 temp_context.set_variable_result(name, value.clone());
2181 }
2182
2183 // Evaluate the FHIRPath expression in the context of the iteration item
2184 match evaluate_expression(path, &temp_context) {
2185 Ok(result) => Ok(result),
2186 Err(_e) => {
2187 // If FHIRPath evaluation fails, try simple property access as fallback
2188 match item {
2189 EvaluationResult::Object { map, .. } => {
2190 if let Some(value) = map.get(path) {
2191 Ok(value.clone())
2192 } else {
2193 Ok(EvaluationResult::Empty)
2194 }
2195 }
2196 _ => Ok(EvaluationResult::Empty),
2197 }
2198 }
2199 }
2200}
2201
2202fn create_iteration_context(
2203 item: &EvaluationResult,
2204 variables: &HashMap<String, EvaluationResult>,
2205) -> EvaluationContext {
2206 // Create a new context with the iteration item as the root
2207 let mut context = EvaluationContext::new(vec![]);
2208 context.this = Some(item.clone());
2209
2210 // Preserve variables from the parent context
2211 for (name, value) in variables {
2212 context.set_variable_result(name, value.clone());
2213 }
2214
2215 context
2216}
2217
2218/// Filter a bundle's resources by their lastUpdated metadata
2219fn filter_bundle_by_since(bundle: SofBundle, since: DateTime<Utc>) -> Result<SofBundle, SofError> {
2220 match bundle {
2221 #[cfg(feature = "R4")]
2222 SofBundle::R4(mut b) => {
2223 if let Some(entries) = b.entry.as_mut() {
2224 entries.retain(|entry| {
2225 entry
2226 .resource
2227 .as_ref()
2228 .and_then(|r| r.get_last_updated())
2229 .map(|last_updated| last_updated > since)
2230 .unwrap_or(false)
2231 });
2232 }
2233 Ok(SofBundle::R4(b))
2234 }
2235 #[cfg(feature = "R4B")]
2236 SofBundle::R4B(mut b) => {
2237 if let Some(entries) = b.entry.as_mut() {
2238 entries.retain(|entry| {
2239 entry
2240 .resource
2241 .as_ref()
2242 .and_then(|r| r.get_last_updated())
2243 .map(|last_updated| last_updated > since)
2244 .unwrap_or(false)
2245 });
2246 }
2247 Ok(SofBundle::R4B(b))
2248 }
2249 #[cfg(feature = "R5")]
2250 SofBundle::R5(mut b) => {
2251 if let Some(entries) = b.entry.as_mut() {
2252 entries.retain(|entry| {
2253 entry
2254 .resource
2255 .as_ref()
2256 .and_then(|r| r.get_last_updated())
2257 .map(|last_updated| last_updated > since)
2258 .unwrap_or(false)
2259 });
2260 }
2261 Ok(SofBundle::R5(b))
2262 }
2263 #[cfg(feature = "R6")]
2264 SofBundle::R6(mut b) => {
2265 if let Some(entries) = b.entry.as_mut() {
2266 entries.retain(|entry| {
2267 entry
2268 .resource
2269 .as_ref()
2270 .and_then(|r| r.get_last_updated())
2271 .map(|last_updated| last_updated > since)
2272 .unwrap_or(false)
2273 });
2274 }
2275 Ok(SofBundle::R6(b))
2276 }
2277 }
2278}
2279
2280/// Apply pagination to processed results
2281fn apply_pagination_to_result(
2282 mut result: ProcessedResult,
2283 limit: Option<usize>,
2284 page: Option<usize>,
2285) -> Result<ProcessedResult, SofError> {
2286 if let Some(limit) = limit {
2287 let page_num = page.unwrap_or(1);
2288 if page_num == 0 {
2289 return Err(SofError::InvalidViewDefinition(
2290 "Page number must be greater than 0".to_string(),
2291 ));
2292 }
2293
2294 let start_index = (page_num - 1) * limit;
2295 if start_index >= result.rows.len() {
2296 // Return empty result if page is beyond data
2297 result.rows.clear();
2298 } else {
2299 let end_index = std::cmp::min(start_index + limit, result.rows.len());
2300 result.rows = result.rows[start_index..end_index].to_vec();
2301 }
2302 }
2303
2304 Ok(result)
2305}
2306
2307fn format_output(
2308 result: ProcessedResult,
2309 content_type: ContentType,
2310 parquet_options: Option<&ParquetOptions>,
2311) -> Result<Vec<u8>, SofError> {
2312 match content_type {
2313 ContentType::Csv | ContentType::CsvWithHeader => {
2314 format_csv(result, content_type == ContentType::CsvWithHeader)
2315 }
2316 ContentType::Json => format_json(result),
2317 ContentType::NdJson => format_ndjson(result),
2318 ContentType::Parquet => format_parquet(result, parquet_options),
2319 }
2320}
2321
2322fn format_csv(result: ProcessedResult, include_header: bool) -> Result<Vec<u8>, SofError> {
2323 let mut wtr = csv::Writer::from_writer(vec![]);
2324
2325 if include_header {
2326 wtr.write_record(&result.columns)?;
2327 }
2328
2329 for row in result.rows {
2330 let record: Vec<String> = row
2331 .values
2332 .iter()
2333 .map(|v| match v {
2334 Some(val) => {
2335 // For string values, extract the raw string instead of JSON serializing
2336 if let serde_json::Value::String(s) = val {
2337 s.clone()
2338 } else {
2339 // For non-string values, use JSON serialization
2340 serde_json::to_string(val).unwrap_or_default()
2341 }
2342 }
2343 None => String::new(),
2344 })
2345 .collect();
2346 wtr.write_record(&record)?;
2347 }
2348
2349 wtr.into_inner()
2350 .map_err(|e| SofError::CsvWriterError(e.to_string()))
2351}
2352
2353fn format_json(result: ProcessedResult) -> Result<Vec<u8>, SofError> {
2354 let mut output = Vec::new();
2355
2356 for row in result.rows {
2357 let mut row_obj = serde_json::Map::new();
2358 for (i, column) in result.columns.iter().enumerate() {
2359 let value = row
2360 .values
2361 .get(i)
2362 .and_then(|v| v.as_ref())
2363 .cloned()
2364 .unwrap_or(serde_json::Value::Null);
2365 row_obj.insert(column.clone(), value);
2366 }
2367 output.push(serde_json::Value::Object(row_obj));
2368 }
2369
2370 Ok(serde_json::to_vec_pretty(&output)?)
2371}
2372
2373fn format_ndjson(result: ProcessedResult) -> Result<Vec<u8>, SofError> {
2374 let mut output = Vec::new();
2375
2376 for row in result.rows {
2377 let mut row_obj = serde_json::Map::new();
2378 for (i, column) in result.columns.iter().enumerate() {
2379 let value = row
2380 .values
2381 .get(i)
2382 .and_then(|v| v.as_ref())
2383 .cloned()
2384 .unwrap_or(serde_json::Value::Null);
2385 row_obj.insert(column.clone(), value);
2386 }
2387 let line = serde_json::to_string(&serde_json::Value::Object(row_obj))?;
2388 output.extend_from_slice(line.as_bytes());
2389 output.push(b'\n');
2390 }
2391
2392 Ok(output)
2393}
2394
2395fn format_parquet(
2396 result: ProcessedResult,
2397 options: Option<&ParquetOptions>,
2398) -> Result<Vec<u8>, SofError> {
2399 use arrow::record_batch::RecordBatch;
2400 use parquet::arrow::ArrowWriter;
2401 use parquet::basic::Compression;
2402 use parquet::file::properties::WriterProperties;
2403 use std::io::Cursor;
2404
2405 // Create Arrow schema from columns and sample data
2406 let schema = parquet_schema::create_arrow_schema(&result.columns, &result.rows)?;
2407 let schema_ref = std::sync::Arc::new(schema.clone());
2408
2409 // Get configuration from options or use defaults
2410 let parquet_opts = options.cloned().unwrap_or_default();
2411
2412 // Calculate optimal batch size based on row count and estimated row size
2413 let target_row_group_size_bytes = (parquet_opts.row_group_size_mb as usize) * 1024 * 1024;
2414 let target_page_size_bytes = (parquet_opts.page_size_kb as usize) * 1024;
2415 const TARGET_ROWS_PER_BATCH: usize = 100_000; // Default batch size
2416 const MAX_ROWS_PER_BATCH: usize = 500_000; // Maximum to prevent memory issues
2417
2418 // Estimate average row size from first 100 rows
2419 let sample_size = std::cmp::min(100, result.rows.len());
2420 let mut estimated_row_size = 100; // Default estimate in bytes
2421
2422 if sample_size > 0 {
2423 let sample_json_size: usize = result.rows[..sample_size]
2424 .iter()
2425 .map(|row| {
2426 row.values
2427 .iter()
2428 .filter_map(|v| v.as_ref())
2429 .map(|v| v.to_string().len())
2430 .sum::<usize>()
2431 })
2432 .sum();
2433 estimated_row_size = (sample_json_size / sample_size).max(50);
2434 }
2435
2436 // Calculate optimal batch size
2437 let optimal_batch_size = (target_row_group_size_bytes / estimated_row_size)
2438 .clamp(TARGET_ROWS_PER_BATCH, MAX_ROWS_PER_BATCH);
2439
2440 // Parse compression algorithm
2441 use parquet::basic::BrotliLevel;
2442 use parquet::basic::GzipLevel;
2443 use parquet::basic::ZstdLevel;
2444
2445 let compression = match parquet_opts.compression.as_str() {
2446 "none" => Compression::UNCOMPRESSED,
2447 "gzip" => Compression::GZIP(GzipLevel::default()),
2448 "lz4" => Compression::LZ4,
2449 "brotli" => Compression::BROTLI(BrotliLevel::default()),
2450 "zstd" => Compression::ZSTD(ZstdLevel::default()),
2451 _ => Compression::SNAPPY, // Default to snappy
2452 };
2453
2454 // Set up writer properties with optimized settings
2455 let props = WriterProperties::builder()
2456 .set_compression(compression)
2457 .set_max_row_group_size(target_row_group_size_bytes)
2458 .set_data_page_row_count_limit(20_000) // Optimal for predicate pushdown
2459 .set_data_page_size_limit(target_page_size_bytes)
2460 .set_write_batch_size(8192) // Control write granularity
2461 .build();
2462
2463 // Write to memory buffer
2464 let mut buffer = Vec::new();
2465 let mut cursor = Cursor::new(&mut buffer);
2466 let mut writer =
2467 ArrowWriter::try_new(&mut cursor, schema_ref.clone(), Some(props)).map_err(|e| {
2468 SofError::ParquetConversionError(format!("Failed to create Parquet writer: {}", e))
2469 })?;
2470
2471 // Process data in batches to handle large datasets efficiently
2472 let mut row_offset = 0;
2473 while row_offset < result.rows.len() {
2474 let batch_end = (row_offset + optimal_batch_size).min(result.rows.len());
2475 let batch_rows = &result.rows[row_offset..batch_end];
2476
2477 // Convert batch to Arrow arrays
2478 let batch_arrays =
2479 parquet_schema::process_to_arrow_arrays(&schema, &result.columns, batch_rows)?;
2480
2481 // Create RecordBatch for this chunk
2482 let batch = RecordBatch::try_new(schema_ref.clone(), batch_arrays).map_err(|e| {
2483 SofError::ParquetConversionError(format!(
2484 "Failed to create RecordBatch for rows {}-{}: {}",
2485 row_offset, batch_end, e
2486 ))
2487 })?;
2488
2489 // Write batch
2490 writer.write(&batch).map_err(|e| {
2491 SofError::ParquetConversionError(format!(
2492 "Failed to write RecordBatch for rows {}-{}: {}",
2493 row_offset, batch_end, e
2494 ))
2495 })?;
2496
2497 row_offset = batch_end;
2498 }
2499
2500 writer.close().map_err(|e| {
2501 SofError::ParquetConversionError(format!("Failed to close Parquet writer: {}", e))
2502 })?;
2503
2504 Ok(buffer)
2505}
2506
2507/// Format Parquet data with automatic file splitting when size exceeds limit
2508pub fn format_parquet_multi_file(
2509 result: ProcessedResult,
2510 options: Option<&ParquetOptions>,
2511 max_file_size_bytes: usize,
2512) -> Result<Vec<Vec<u8>>, SofError> {
2513 use arrow::record_batch::RecordBatch;
2514 use parquet::arrow::ArrowWriter;
2515 use parquet::basic::Compression;
2516 use parquet::file::properties::WriterProperties;
2517 use std::io::Cursor;
2518
2519 // Create Arrow schema from columns and sample data
2520 let schema = parquet_schema::create_arrow_schema(&result.columns, &result.rows)?;
2521 let schema_ref = std::sync::Arc::new(schema.clone());
2522
2523 // Get configuration from options or use defaults
2524 let parquet_opts = options.cloned().unwrap_or_default();
2525
2526 // Calculate optimal batch size
2527 let target_row_group_size_bytes = (parquet_opts.row_group_size_mb as usize) * 1024 * 1024;
2528 let target_page_size_bytes = (parquet_opts.page_size_kb as usize) * 1024;
2529 const TARGET_ROWS_PER_BATCH: usize = 100_000;
2530 const MAX_ROWS_PER_BATCH: usize = 500_000;
2531
2532 // Estimate average row size
2533 let sample_size = std::cmp::min(100, result.rows.len());
2534 let mut estimated_row_size = 100;
2535
2536 if sample_size > 0 {
2537 let sample_json_size: usize = result.rows[..sample_size]
2538 .iter()
2539 .map(|row| {
2540 row.values
2541 .iter()
2542 .filter_map(|v| v.as_ref())
2543 .map(|v| v.to_string().len())
2544 .sum::<usize>()
2545 })
2546 .sum();
2547 estimated_row_size = (sample_json_size / sample_size).max(50);
2548 }
2549
2550 let optimal_batch_size = (target_row_group_size_bytes / estimated_row_size)
2551 .clamp(TARGET_ROWS_PER_BATCH, MAX_ROWS_PER_BATCH);
2552
2553 // Parse compression algorithm
2554 use parquet::basic::BrotliLevel;
2555 use parquet::basic::GzipLevel;
2556 use parquet::basic::ZstdLevel;
2557
2558 let compression = match parquet_opts.compression.as_str() {
2559 "none" => Compression::UNCOMPRESSED,
2560 "gzip" => Compression::GZIP(GzipLevel::default()),
2561 "lz4" => Compression::LZ4,
2562 "brotli" => Compression::BROTLI(BrotliLevel::default()),
2563 "zstd" => Compression::ZSTD(ZstdLevel::default()),
2564 _ => Compression::SNAPPY,
2565 };
2566
2567 // Set up writer properties
2568 let props = WriterProperties::builder()
2569 .set_compression(compression)
2570 .set_max_row_group_size(target_row_group_size_bytes)
2571 .set_data_page_row_count_limit(20_000)
2572 .set_data_page_size_limit(target_page_size_bytes)
2573 .set_write_batch_size(8192)
2574 .build();
2575
2576 let mut file_buffers = Vec::new();
2577 let mut current_buffer = Vec::new();
2578 let mut current_cursor = Cursor::new(&mut current_buffer);
2579 let mut current_writer =
2580 ArrowWriter::try_new(&mut current_cursor, schema_ref.clone(), Some(props.clone()))
2581 .map_err(|e| {
2582 SofError::ParquetConversionError(format!("Failed to create Parquet writer: {}", e))
2583 })?;
2584
2585 let mut row_offset = 0;
2586 let mut _current_file_rows = 0;
2587
2588 while row_offset < result.rows.len() {
2589 let batch_end = (row_offset + optimal_batch_size).min(result.rows.len());
2590 let batch_rows = &result.rows[row_offset..batch_end];
2591
2592 // Convert batch to Arrow arrays
2593 let batch_arrays =
2594 parquet_schema::process_to_arrow_arrays(&schema, &result.columns, batch_rows)?;
2595
2596 // Create RecordBatch
2597 let batch = RecordBatch::try_new(schema_ref.clone(), batch_arrays).map_err(|e| {
2598 SofError::ParquetConversionError(format!(
2599 "Failed to create RecordBatch for rows {}-{}: {}",
2600 row_offset, batch_end, e
2601 ))
2602 })?;
2603
2604 // Write batch
2605 current_writer.write(&batch).map_err(|e| {
2606 SofError::ParquetConversionError(format!(
2607 "Failed to write RecordBatch for rows {}-{}: {}",
2608 row_offset, batch_end, e
2609 ))
2610 })?;
2611
2612 _current_file_rows += batch_end - row_offset;
2613 row_offset = batch_end;
2614
2615 // Check if we should start a new file
2616 // Get actual size of current buffer by flushing the writer
2617 let current_size = current_writer.bytes_written();
2618
2619 if current_size >= max_file_size_bytes && row_offset < result.rows.len() {
2620 // Close current file
2621 current_writer.close().map_err(|e| {
2622 SofError::ParquetConversionError(format!("Failed to close Parquet writer: {}", e))
2623 })?;
2624
2625 // Save the buffer
2626 file_buffers.push(current_buffer);
2627
2628 // Start new file
2629 current_buffer = Vec::new();
2630 current_cursor = Cursor::new(&mut current_buffer);
2631 current_writer =
2632 ArrowWriter::try_new(&mut current_cursor, schema_ref.clone(), Some(props.clone()))
2633 .map_err(|e| {
2634 SofError::ParquetConversionError(format!(
2635 "Failed to create new Parquet writer: {}",
2636 e
2637 ))
2638 })?;
2639 _current_file_rows = 0;
2640 }
2641 }
2642
2643 // Close the final writer
2644 current_writer.close().map_err(|e| {
2645 SofError::ParquetConversionError(format!("Failed to close final Parquet writer: {}", e))
2646 })?;
2647
2648 file_buffers.push(current_buffer);
2649
2650 Ok(file_buffers)
2651}