helios_sof/
lib.rs

1//! # SQL-on-FHIR Implementation
2//!
3//! This crate provides a complete implementation of the [SQL-on-FHIR
4//! specification](https://sql-on-fhir.org/ig/latest),
5//! enabling the transformation of FHIR resources into tabular data using declarative
6//! ViewDefinitions. It supports all major FHIR versions (R4, R4B, R5, R6) through
7//! a version-agnostic abstraction layer.
8
9//!
10//! There are three consumers of this crate:
11//! - [sof_cli](../sof_cli/index.html) - A command-line interface for the SQL-on-FHIR implementation,
12//!   allowing users to execute ViewDefinition transformations on FHIR Bundle resources
13//!   and output the results in various formats.
14//! - [sof_server](../sof_server/index.html) - A stateless HTTP server implementation for the SQL-on-FHIR specification,
15//!   enabling HTTP-based access to ViewDefinition transformation capabilities.
16//! - [hfs](../hfs/index.html) - The full featured Helios FHIR Server.
17//!
18//! ## Architecture
19//!
20//! The SOF crate is organized around these key components:
21//! - **Version-agnostic enums** ([`SofViewDefinition`], [`SofBundle`]): Multi-version containers
22//! - **Processing engine** ([`run_view_definition`]): Core transformation logic
23//! - **Output formats** ([`ContentType`]): Support for CSV, JSON, NDJSON, and Parquet
24//! - **Trait abstractions** ([`ViewDefinitionTrait`], [`BundleTrait`]): Version independence
25//!
26//! ## Key Features
27//!
28//! - **Multi-version FHIR support**: Works with R4, R4B, R5, and R6 resources
29//! - **FHIRPath evaluation**: Complex path expressions for data extraction
30//! - **forEach iteration**: Supports flattening of nested FHIR structures
31//! - **unionAll operations**: Combines multiple select statements
32//! - **Collection handling**: Proper array serialization for multi-valued fields
33//! - **Output formats**: CSV (with/without headers), JSON, NDJSON, Parquet support
34//!
35//! ## Usage Example
36//!
37//! ```rust
38//! # #[cfg(not(target_os = "windows"))]
39//! # {
40//! use helios_sof::{SofViewDefinition, SofBundle, ContentType, run_view_definition};
41//! use helios_fhir::FhirVersion;
42//!
43//! # #[cfg(feature = "R4")]
44//! # {
45//! // Parse a ViewDefinition and Bundle from JSON
46//! let view_definition_json = r#"{
47//!     "resourceType": "ViewDefinition",
48//!     "status": "active",
49//!     "resource": "Patient",
50//!     "select": [{
51//!         "column": [{
52//!             "name": "id",
53//!             "path": "id"
54//!         }, {
55//!             "name": "name",
56//!             "path": "name.family"
57//!         }]
58//!     }]
59//! }"#;
60//!
61//! let bundle_json = r#"{
62//!     "resourceType": "Bundle",
63//!     "type": "collection",
64//!     "entry": [{
65//!         "resource": {
66//!             "resourceType": "Patient",
67//!             "id": "example",
68//!             "name": [{
69//!                 "family": "Doe",
70//!                 "given": ["John"]
71//!             }]
72//!         }
73//!     }]
74//! }"#;
75//!
76//! let view_definition: helios_fhir::r4::ViewDefinition = serde_json::from_str(view_definition_json)?;
77//! let bundle: helios_fhir::r4::Bundle = serde_json::from_str(bundle_json)?;
78//!
79//! // Wrap in version-agnostic containers
80//! let sof_view = SofViewDefinition::R4(view_definition);
81//! let sof_bundle = SofBundle::R4(bundle);
82//!
83//! // Transform to CSV with headers
84//! let csv_output = run_view_definition(
85//!     sof_view,
86//!     sof_bundle,
87//!     ContentType::CsvWithHeader
88//! )?;
89//!
90//! // Check the CSV output
91//! let csv_string = String::from_utf8(csv_output)?;
92//! assert!(csv_string.contains("id,name"));
93//! // CSV values are quoted
94//! assert!(csv_string.contains("example") && csv_string.contains("Doe"));
95//! # }
96//! # }
97//! # Ok::<(), Box<dyn std::error::Error>>(())
98//! ```
99//!
100//! ## Advanced Features
101//!
102//! ### forEach Iteration
103//!
104//! ViewDefinitions can iterate over collections using `forEach` and `forEachOrNull`:
105//!
106//! ```json
107//! {
108//!   "select": [{
109//!     "forEach": "name",
110//!     "column": [{
111//!       "name": "family_name",
112//!       "path": "family"
113//!     }]
114//!   }]
115//! }
116//! ```
117//!
118//! ### Constants and Variables
119//!
120//! Define reusable values in ViewDefinitions:
121//!
122//! ```json
123//! {
124//!   "constant": [{
125//!     "name": "system",
126//!     "valueString": "http://loinc.org"
127//!   }],
128//!   "select": [{
129//!     "where": [{
130//!       "path": "code.coding.system = %system"
131//!     }]
132//!   }]
133//! }
134//! ```
135//!
136//! ### Where Clauses
137//!
138//! Filter resources using FHIRPath expressions:
139//!
140//! ```json
141//! {
142//!   "where": [{
143//!     "path": "active = true"
144//!   }, {
145//!     "path": "birthDate.exists()"
146//!   }]
147//! }
148//! ```
149//!
150//! ## Error Handling
151//!
152//! The crate provides comprehensive error handling through [`SofError`]:
153//!
154//! ```rust,no_run
155//! use helios_sof::{SofError, SofViewDefinition, SofBundle, ContentType, run_view_definition};
156//!
157//! # let view = SofViewDefinition::R4(helios_fhir::r4::ViewDefinition::default());
158//! # let bundle = SofBundle::R4(helios_fhir::r4::Bundle::default());
159//! # let content_type = ContentType::Json;
160//! match run_view_definition(view, bundle, content_type) {
161//!     Ok(output) => {
162//!         // Process successful transformation
163//!     },
164//!     Err(SofError::InvalidViewDefinition(msg)) => {
165//!         eprintln!("ViewDefinition validation failed: {}", msg);
166//!     },
167//!     Err(SofError::FhirPathError(msg)) => {
168//!         eprintln!("FHIRPath evaluation failed: {}", msg);
169//!     },
170//!     Err(e) => {
171//!         eprintln!("Other error: {}", e);
172//!     }
173//! }
174//! ```
175//! ## Feature Flags
176//!
177//! Enable support for specific FHIR versions:
178//! - `R4`: FHIR 4.0.1 support
179//! - `R4B`: FHIR 4.3.0 support
180//! - `R5`: FHIR 5.0.0 support
181//! - `R6`: FHIR 6.0.0 support
182
183pub mod data_source;
184pub mod parquet_schema;
185pub mod traits;
186
187use chrono::{DateTime, Utc};
188use helios_fhirpath::{EvaluationContext, EvaluationResult, evaluate_expression};
189use rayon::prelude::*;
190use serde::{Deserialize, Serialize};
191use std::collections::HashMap;
192use thiserror::Error;
193use traits::*;
194
195// Re-export commonly used types and traits for easier access
196pub use helios_fhir::FhirVersion;
197pub use traits::{BundleTrait, ResourceTrait, ViewDefinitionTrait};
198
199/// Multi-version ViewDefinition container supporting version-agnostic operations.
200///
201/// This enum provides a unified interface for working with ViewDefinition resources
202/// across different FHIR specification versions. It enables applications to handle
203/// multiple FHIR versions simultaneously while maintaining type safety.
204///
205/// # Supported Versions
206///
207/// - **R4**: FHIR 4.0.1 ViewDefinition (normative)
208/// - **R4B**: FHIR 4.3.0 ViewDefinition (ballot)
209/// - **R5**: FHIR 5.0.0 ViewDefinition (ballot)
210/// - **R6**: FHIR 6.0.0 ViewDefinition (draft)
211///
212/// # Examples
213///
214/// ```rust
215/// use helios_sof::{SofViewDefinition, ContentType};
216/// # #[cfg(feature = "R4")]
217/// use helios_fhir::r4::ViewDefinition;
218///
219/// # #[cfg(feature = "R4")]
220/// # {
221/// // Parse from JSON
222/// let json = r#"{
223///     "resourceType": "ViewDefinition",
224///     "resource": "Patient",
225///     "select": [{
226///         "column": [{
227///             "name": "id",
228///             "path": "id"
229///         }]
230///     }]
231/// }"#;
232///
233/// let view_def: ViewDefinition = serde_json::from_str(json)?;
234/// let sof_view = SofViewDefinition::R4(view_def);
235///
236/// // Check version
237/// assert_eq!(sof_view.version(), helios_fhir::FhirVersion::R4);
238/// # }
239/// # Ok::<(), Box<dyn std::error::Error>>(())
240/// ```
241#[derive(Debug, Clone)]
242pub enum SofViewDefinition {
243    #[cfg(feature = "R4")]
244    R4(helios_fhir::r4::ViewDefinition),
245    #[cfg(feature = "R4B")]
246    R4B(helios_fhir::r4b::ViewDefinition),
247    #[cfg(feature = "R5")]
248    R5(helios_fhir::r5::ViewDefinition),
249    #[cfg(feature = "R6")]
250    R6(helios_fhir::r6::ViewDefinition),
251}
252
253impl SofViewDefinition {
254    /// Returns the FHIR specification version of this ViewDefinition.
255    ///
256    /// This method provides version detection for multi-version applications,
257    /// enabling version-specific processing logic and compatibility checks.
258    ///
259    /// # Returns
260    ///
261    /// The `FhirVersion` enum variant corresponding to this ViewDefinition's specification.
262    ///
263    /// # Examples
264    ///
265    /// ```rust
266    /// use helios_sof::SofViewDefinition;
267    /// use helios_fhir::FhirVersion;
268    ///
269    /// # #[cfg(feature = "R5")]
270    /// # {
271    /// # let view_def = helios_fhir::r5::ViewDefinition::default();
272    /// let sof_view = SofViewDefinition::R5(view_def);
273    /// assert_eq!(sof_view.version(), helios_fhir::FhirVersion::R5);
274    /// # }
275    /// ```
276    pub fn version(&self) -> helios_fhir::FhirVersion {
277        match self {
278            #[cfg(feature = "R4")]
279            SofViewDefinition::R4(_) => helios_fhir::FhirVersion::R4,
280            #[cfg(feature = "R4B")]
281            SofViewDefinition::R4B(_) => helios_fhir::FhirVersion::R4B,
282            #[cfg(feature = "R5")]
283            SofViewDefinition::R5(_) => helios_fhir::FhirVersion::R5,
284            #[cfg(feature = "R6")]
285            SofViewDefinition::R6(_) => helios_fhir::FhirVersion::R6,
286        }
287    }
288}
289
290/// Multi-version Bundle container supporting version-agnostic operations.
291///
292/// This enum provides a unified interface for working with FHIR Bundle resources
293/// across different FHIR specification versions. Bundles contain the actual FHIR
294/// resources that will be processed by ViewDefinitions.
295///
296/// # Supported Versions
297///
298/// - **R4**: FHIR 4.0.1 Bundle (normative)
299/// - **R4B**: FHIR 4.3.0 Bundle (ballot)
300/// - **R5**: FHIR 5.0.0 Bundle (ballot)
301/// - **R6**: FHIR 6.0.0 Bundle (draft)
302///
303/// # Examples
304///
305/// ```rust
306/// # #[cfg(not(target_os = "windows"))]
307/// # {
308/// use helios_sof::SofBundle;
309/// # #[cfg(feature = "R4")]
310/// use helios_fhir::r4::Bundle;
311///
312/// # #[cfg(feature = "R4")]
313/// # {
314/// // Parse from JSON
315/// let json = r#"{
316///     "resourceType": "Bundle",
317///     "type": "collection",
318///     "entry": [{
319///         "resource": {
320///             "resourceType": "Patient",
321///             "id": "example"
322///         }
323///     }]
324/// }"#;
325///
326/// let bundle: Bundle = serde_json::from_str(json)?;
327/// let sof_bundle = SofBundle::R4(bundle);
328///
329/// // Check version compatibility
330/// assert_eq!(sof_bundle.version(), helios_fhir::FhirVersion::R4);
331/// # }
332/// # }
333/// # Ok::<(), Box<dyn std::error::Error>>(())
334/// ```
335#[derive(Debug, Clone)]
336pub enum SofBundle {
337    #[cfg(feature = "R4")]
338    R4(helios_fhir::r4::Bundle),
339    #[cfg(feature = "R4B")]
340    R4B(helios_fhir::r4b::Bundle),
341    #[cfg(feature = "R5")]
342    R5(helios_fhir::r5::Bundle),
343    #[cfg(feature = "R6")]
344    R6(helios_fhir::r6::Bundle),
345}
346
347impl SofBundle {
348    /// Returns the FHIR specification version of this Bundle.
349    ///
350    /// This method provides version detection for multi-version applications,
351    /// ensuring that ViewDefinitions and Bundles use compatible FHIR versions.
352    ///
353    /// # Returns
354    ///
355    /// The `FhirVersion` enum variant corresponding to this Bundle's specification.
356    ///
357    /// # Examples
358    ///
359    /// ```rust
360    /// use helios_sof::SofBundle;
361    /// use helios_fhir::FhirVersion;
362    ///
363    /// # #[cfg(feature = "R4")]
364    /// # {
365    /// # let bundle = helios_fhir::r4::Bundle::default();
366    /// let sof_bundle = SofBundle::R4(bundle);
367    /// assert_eq!(sof_bundle.version(), helios_fhir::FhirVersion::R4);
368    /// # }
369    /// ```
370    pub fn version(&self) -> helios_fhir::FhirVersion {
371        match self {
372            #[cfg(feature = "R4")]
373            SofBundle::R4(_) => helios_fhir::FhirVersion::R4,
374            #[cfg(feature = "R4B")]
375            SofBundle::R4B(_) => helios_fhir::FhirVersion::R4B,
376            #[cfg(feature = "R5")]
377            SofBundle::R5(_) => helios_fhir::FhirVersion::R5,
378            #[cfg(feature = "R6")]
379            SofBundle::R6(_) => helios_fhir::FhirVersion::R6,
380        }
381    }
382}
383
384/// Multi-version CapabilityStatement container supporting version-agnostic operations.
385///
386/// This enum provides a unified interface for working with CapabilityStatement resources
387/// across different FHIR specification versions. It enables applications to handle
388/// multiple FHIR versions simultaneously while maintaining type safety.
389///
390/// # Supported Versions
391///
392/// - **R4**: FHIR 4.0.1 CapabilityStatement (normative)
393/// - **R4B**: FHIR 4.3.0 CapabilityStatement (ballot)
394/// - **R5**: FHIR 5.0.0 CapabilityStatement (ballot)
395/// - **R6**: FHIR 6.0.0 CapabilityStatement (draft)
396#[derive(Debug, Clone, Serialize, Deserialize)]
397#[serde(untagged)]
398pub enum SofCapabilityStatement {
399    #[cfg(feature = "R4")]
400    R4(helios_fhir::r4::CapabilityStatement),
401    #[cfg(feature = "R4B")]
402    R4B(helios_fhir::r4b::CapabilityStatement),
403    #[cfg(feature = "R5")]
404    R5(helios_fhir::r5::CapabilityStatement),
405    #[cfg(feature = "R6")]
406    R6(helios_fhir::r6::CapabilityStatement),
407}
408
409impl SofCapabilityStatement {
410    /// Returns the FHIR specification version of this CapabilityStatement.
411    pub fn version(&self) -> helios_fhir::FhirVersion {
412        match self {
413            #[cfg(feature = "R4")]
414            SofCapabilityStatement::R4(_) => helios_fhir::FhirVersion::R4,
415            #[cfg(feature = "R4B")]
416            SofCapabilityStatement::R4B(_) => helios_fhir::FhirVersion::R4B,
417            #[cfg(feature = "R5")]
418            SofCapabilityStatement::R5(_) => helios_fhir::FhirVersion::R5,
419            #[cfg(feature = "R6")]
420            SofCapabilityStatement::R6(_) => helios_fhir::FhirVersion::R6,
421        }
422    }
423}
424
425/// Type alias for the version-independent Parameters container.
426///
427/// This alias provides backward compatibility while using the unified
428/// VersionIndependentParameters from the helios_fhir crate.
429pub type SofParameters = helios_fhir::VersionIndependentParameters;
430
431/// Comprehensive error type for SQL-on-FHIR operations.
432///
433/// This enum covers all possible error conditions that can occur during
434/// ViewDefinition processing, from validation failures to output formatting issues.
435/// Each variant provides specific context about the error to aid in debugging.
436///
437/// # Error Categories
438///
439/// - **Validation**: ViewDefinition structure and logic validation
440/// - **Evaluation**: FHIRPath expression evaluation failures
441/// - **I/O**: File and serialization operations
442/// - **Format**: Output format conversion issues
443///
444/// # Examples
445///
446/// ```rust,no_run
447/// use helios_sof::{SofError, SofViewDefinition, SofBundle, ContentType, run_view_definition};
448///
449/// # let view = SofViewDefinition::R4(helios_fhir::r4::ViewDefinition::default());
450/// # let bundle = SofBundle::R4(helios_fhir::r4::Bundle::default());
451/// # let content_type = ContentType::Json;
452/// match run_view_definition(view, bundle, content_type) {
453///     Ok(output) => {
454///         println!("Transformation successful");
455///     },
456///     Err(SofError::InvalidViewDefinition(msg)) => {
457///         eprintln!("ViewDefinition validation failed: {}", msg);
458///     },
459///     Err(SofError::FhirPathError(msg)) => {
460///         eprintln!("FHIRPath evaluation error: {}", msg);
461///     },
462///     Err(SofError::UnsupportedContentType(format)) => {
463///         eprintln!("Unsupported output format: {}", format);
464///     },
465///     Err(e) => {
466///         eprintln!("Other error: {}", e);
467///     }
468/// }
469/// ```
470#[derive(Debug, Error)]
471pub enum SofError {
472    /// ViewDefinition structure or logic validation failed.
473    ///
474    /// This error occurs when a ViewDefinition contains invalid or inconsistent
475    /// configuration, such as missing required fields, invalid FHIRPath expressions,
476    /// or incompatible select/unionAll structures.
477    #[error("Invalid ViewDefinition: {0}")]
478    InvalidViewDefinition(String),
479
480    /// FHIRPath expression evaluation failed.
481    ///
482    /// This error occurs when a FHIRPath expression in a ViewDefinition cannot
483    /// be evaluated, either due to syntax errors or runtime evaluation issues.
484    #[error("FHIRPath evaluation error: {0}")]
485    FhirPathError(String),
486
487    /// JSON serialization/deserialization failed.
488    ///
489    /// This error occurs when parsing input JSON or serializing output data fails,
490    /// typically due to malformed JSON or incompatible data structures.
491    #[error("Serialization error: {0}")]
492    SerializationError(#[from] serde_json::Error),
493
494    /// CSV processing failed.
495    ///
496    /// This error occurs during CSV output generation, such as when writing
497    /// headers or data rows to the CSV format.
498    #[error("CSV error: {0}")]
499    CsvError(#[from] csv::Error),
500
501    /// File I/O operation failed.
502    ///
503    /// This error occurs when reading input files or writing output files fails,
504    /// typically due to permission issues or missing files.
505    #[error("IO error: {0}")]
506    IoError(#[from] std::io::Error),
507
508    /// Unsupported output content type requested.
509    ///
510    /// This error occurs when an invalid or unimplemented content type is
511    /// specified for output formatting.
512    #[error("Unsupported content type: {0}")]
513    UnsupportedContentType(String),
514
515    /// CSV writer internal error.
516    ///
517    /// This error occurs when the CSV writer encounters an internal issue
518    /// that prevents successful output generation.
519    #[error("CSV writer error: {0}")]
520    CsvWriterError(String),
521
522    /// Invalid source parameter value.
523    ///
524    /// This error occurs when the source parameter contains an invalid URL or path.
525    #[error("Invalid source: {0}")]
526    InvalidSource(String),
527
528    /// Source not found.
529    ///
530    /// This error occurs when the specified source file or URL cannot be found.
531    #[error("Source not found: {0}")]
532    SourceNotFound(String),
533
534    /// Failed to fetch data from source.
535    ///
536    /// This error occurs when fetching data from a remote source fails.
537    #[error("Failed to fetch source: {0}")]
538    SourceFetchError(String),
539
540    /// Failed to read source data.
541    ///
542    /// This error occurs when reading data from the source fails.
543    #[error("Failed to read source: {0}")]
544    SourceReadError(String),
545
546    /// Invalid content in source.
547    ///
548    /// This error occurs when the source content is not valid FHIR data.
549    #[error("Invalid source content: {0}")]
550    InvalidSourceContent(String),
551
552    /// Unsupported source protocol.
553    ///
554    /// This error occurs when the source URL uses an unsupported protocol.
555    #[error("Unsupported source protocol: {0}")]
556    UnsupportedSourceProtocol(String),
557
558    /// Parquet conversion error.
559    ///
560    /// This error occurs when converting data to Parquet format fails.
561    #[error("Parquet conversion error: {0}")]
562    ParquetConversionError(String),
563}
564
565/// Supported output content types for ViewDefinition transformations.
566///
567/// This enum defines the available output formats for transformed FHIR data.
568/// Each format has specific characteristics and use cases for different
569/// integration scenarios.
570///
571/// # Format Descriptions
572///
573/// - **CSV**: Comma-separated values without headers
574/// - **CSV with Headers**: Comma-separated values with column headers
575/// - **JSON**: Pretty-printed JSON array of objects
576/// - **NDJSON**: Newline-delimited JSON (one object per line)
577/// - **Parquet**: Apache Parquet columnar format (planned)
578///
579/// # Examples
580///
581/// ```rust
582/// use helios_sof::ContentType;
583///
584/// // Parse from string
585/// let csv_type = ContentType::from_string("text/csv")?;
586/// assert_eq!(csv_type, ContentType::CsvWithHeader);  // Default includes headers
587///
588/// let json_type = ContentType::from_string("application/json")?;
589/// assert_eq!(json_type, ContentType::Json);
590///
591/// // CSV without headers
592/// let csv_no_headers = ContentType::from_string("text/csv;header=false")?;
593/// assert_eq!(csv_no_headers, ContentType::Csv);
594/// # Ok::<(), helios_sof::SofError>(())
595/// ```
596#[derive(Debug, Clone, Copy, PartialEq, Eq)]
597pub enum ContentType {
598    /// Comma-separated values format without headers
599    Csv,
600    /// Comma-separated values format with column headers
601    CsvWithHeader,
602    /// Pretty-printed JSON array format
603    Json,
604    /// Newline-delimited JSON format (NDJSON)
605    NdJson,
606    /// Apache Parquet columnar format (not yet implemented)
607    Parquet,
608}
609
610impl ContentType {
611    /// Parse a content type from its MIME type string representation.
612    ///
613    /// This method converts standard MIME type strings to the corresponding
614    /// ContentType enum variants. It supports the SQL-on-FHIR specification's
615    /// recommended content types.
616    ///
617    /// # Supported MIME Types
618    ///
619    /// - `"text/csv"` → [`ContentType::Csv`]
620    /// - `"text/csv"` → [`ContentType::CsvWithHeader`] (default: headers included)
621    /// - `"text/csv;header=true"` → [`ContentType::CsvWithHeader`]
622    /// - `"text/csv;header=false"` → [`ContentType::Csv`]
623    /// - `"application/json"` → [`ContentType::Json`]
624    /// - `"application/ndjson"` → [`ContentType::NdJson`]
625    /// - `"application/x-ndjson"` → [`ContentType::NdJson`]
626    /// - `"application/parquet"` → [`ContentType::Parquet`]
627    ///
628    /// # Arguments
629    ///
630    /// * `s` - The MIME type string to parse
631    ///
632    /// # Returns
633    ///
634    /// * `Ok(ContentType)` - Successfully parsed content type
635    /// * `Err(SofError::UnsupportedContentType)` - Unknown or unsupported MIME type
636    ///
637    /// # Examples
638    ///
639    /// ```rust
640    /// use helios_sof::ContentType;
641    ///
642    /// // Shortened format names
643    /// let csv = ContentType::from_string("csv")?;
644    /// assert_eq!(csv, ContentType::CsvWithHeader);
645    ///
646    /// let json = ContentType::from_string("json")?;
647    /// assert_eq!(json, ContentType::Json);
648    ///
649    /// let ndjson = ContentType::from_string("ndjson")?;
650    /// assert_eq!(ndjson, ContentType::NdJson);
651    ///
652    /// // Full MIME types still supported
653    /// let csv_mime = ContentType::from_string("text/csv")?;
654    /// assert_eq!(csv_mime, ContentType::CsvWithHeader);
655    ///
656    /// // CSV with headers explicitly
657    /// let csv_headers = ContentType::from_string("text/csv;header=true")?;
658    /// assert_eq!(csv_headers, ContentType::CsvWithHeader);
659    ///
660    /// // CSV without headers
661    /// let csv_no_headers = ContentType::from_string("text/csv;header=false")?;
662    /// assert_eq!(csv_no_headers, ContentType::Csv);
663    ///
664    /// // JSON format
665    /// let json_mime = ContentType::from_string("application/json")?;
666    /// assert_eq!(json_mime, ContentType::Json);
667    ///
668    /// // Error for unsupported type
669    /// assert!(ContentType::from_string("text/plain").is_err());
670    /// # Ok::<(), helios_sof::SofError>(())
671    /// ```
672    pub fn from_string(s: &str) -> Result<Self, SofError> {
673        match s {
674            // Shortened format names
675            "csv" => Ok(ContentType::CsvWithHeader),
676            "json" => Ok(ContentType::Json),
677            "ndjson" => Ok(ContentType::NdJson),
678            "parquet" => Ok(ContentType::Parquet),
679            // Full MIME types (for Accept header compatibility)
680            "text/csv;header=false" => Ok(ContentType::Csv),
681            "text/csv" | "text/csv;header=true" => Ok(ContentType::CsvWithHeader),
682            "application/json" => Ok(ContentType::Json),
683            "application/ndjson" | "application/x-ndjson" => Ok(ContentType::NdJson),
684            "application/parquet" => Ok(ContentType::Parquet),
685            _ => Err(SofError::UnsupportedContentType(s.to_string())),
686        }
687    }
688}
689
690/// Returns the FHIR version string for the newest enabled version.
691///
692/// This function provides the version string that should be used in CapabilityStatements
693/// and other FHIR resources that need to specify their version.
694pub fn get_fhir_version_string() -> &'static str {
695    let newest_version = get_newest_enabled_fhir_version();
696
697    match newest_version {
698        #[cfg(feature = "R4")]
699        helios_fhir::FhirVersion::R4 => "4.0.1",
700        #[cfg(feature = "R4B")]
701        helios_fhir::FhirVersion::R4B => "4.3.0",
702        #[cfg(feature = "R5")]
703        helios_fhir::FhirVersion::R5 => "5.0.0",
704        #[cfg(feature = "R6")]
705        helios_fhir::FhirVersion::R6 => "6.0.0",
706    }
707}
708
709/// Returns the newest FHIR version that is enabled at compile time.
710///
711/// This function uses compile-time feature detection to determine which FHIR
712/// version should be used when multiple versions are enabled. The priority order
713/// is: R6 > R5 > R4B > R4, where newer versions take precedence.
714///
715/// # Examples
716///
717/// ```rust
718/// use helios_sof::{get_newest_enabled_fhir_version, FhirVersion};
719///
720/// # #[cfg(any(feature = "R4", feature = "R4B", feature = "R5", feature = "R6"))]
721/// # {
722/// let version = get_newest_enabled_fhir_version();
723/// // If R5 and R4 are both enabled, this returns R5
724/// # }
725/// ```
726///
727/// # Panics
728///
729/// This function will panic at compile time if no FHIR version features are enabled.
730pub fn get_newest_enabled_fhir_version() -> helios_fhir::FhirVersion {
731    #[cfg(feature = "R6")]
732    return helios_fhir::FhirVersion::R6;
733
734    #[cfg(all(feature = "R5", not(feature = "R6")))]
735    return helios_fhir::FhirVersion::R5;
736
737    #[cfg(all(feature = "R4B", not(feature = "R5"), not(feature = "R6")))]
738    return helios_fhir::FhirVersion::R4B;
739
740    #[cfg(all(
741        feature = "R4",
742        not(feature = "R4B"),
743        not(feature = "R5"),
744        not(feature = "R6")
745    ))]
746    return helios_fhir::FhirVersion::R4;
747
748    #[cfg(not(any(feature = "R4", feature = "R4B", feature = "R5", feature = "R6")))]
749    panic!("At least one FHIR version feature must be enabled");
750}
751
752/// A single row of processed tabular data from ViewDefinition transformation.
753///
754/// This struct represents one row in the output table, containing values for
755/// each column defined in the ViewDefinition. Values are stored as optional
756/// JSON values to handle nullable fields and diverse FHIR data types.
757///
758/// # Structure
759///
760/// Each `ProcessedRow` contains a vector of optional JSON values, where:
761/// - `Some(value)` represents a non-null column value
762/// - `None` represents a null/missing column value
763/// - The order matches the column order in [`ProcessedResult::columns`]
764///
765/// # Examples
766///
767/// ```rust
768/// use helios_sof::ProcessedRow;
769/// use serde_json::Value;
770///
771/// let row = ProcessedRow {
772///     values: vec![
773///         Some(Value::String("patient-123".to_string())),
774///         Some(Value::String("Doe".to_string())),
775///         None, // Missing birth date
776///         Some(Value::Bool(true)),
777///     ]
778/// };
779/// ```
780#[derive(Debug, Clone, Serialize, Deserialize)]
781pub struct ProcessedRow {
782    /// Column values for this row, ordered according to ProcessedResult::columns
783    pub values: Vec<Option<serde_json::Value>>,
784}
785
786/// Complete result of ViewDefinition transformation containing columns and data rows.
787///
788/// This struct represents the tabular output from processing a ViewDefinition
789/// against a Bundle of FHIR resources. It contains both the column definitions
790/// and the actual data rows in a format ready for serialization to various
791/// output formats.
792///
793/// # Structure
794///
795/// - [`columns`](Self::columns): Ordered list of column names from the ViewDefinition
796/// - [`rows`](Self::rows): Data rows where each row contains values in column order
797///
798/// # Examples
799///
800/// ```rust
801/// use helios_sof::{ProcessedResult, ProcessedRow};
802/// use serde_json::Value;
803///
804/// let result = ProcessedResult {
805///     columns: vec![
806///         "patient_id".to_string(),
807///         "family_name".to_string(),
808///         "given_name".to_string(),
809///     ],
810///     rows: vec![
811///         ProcessedRow {
812///             values: vec![
813///                 Some(Value::String("patient-1".to_string())),
814///                 Some(Value::String("Smith".to_string())),
815///                 Some(Value::String("John".to_string())),
816///             ]
817///         },
818///         ProcessedRow {
819///             values: vec![
820///                 Some(Value::String("patient-2".to_string())),
821///                 Some(Value::String("Doe".to_string())),
822///                 None, // Missing given name
823///             ]
824///         },
825///     ]
826/// };
827///
828/// assert_eq!(result.columns.len(), 3);
829/// assert_eq!(result.rows.len(), 2);
830/// ```
831#[derive(Debug, Clone, Serialize, Deserialize)]
832pub struct ProcessedResult {
833    /// Ordered list of column names as defined in the ViewDefinition
834    pub columns: Vec<String>,
835    /// Data rows containing values for each column
836    pub rows: Vec<ProcessedRow>,
837}
838
839/// Execute a SQL-on-FHIR ViewDefinition transformation on a FHIR Bundle.
840///
841/// This is the main entry point for SQL-on-FHIR transformations. It processes
842/// a ViewDefinition against a Bundle of FHIR resources and produces output in
843/// the specified format. The function handles version compatibility, validation,
844/// FHIRPath evaluation, and output formatting.
845///
846/// # Arguments
847///
848/// * `view_definition` - The ViewDefinition containing transformation logic
849/// * `bundle` - The Bundle containing FHIR resources to process
850/// * `content_type` - The desired output format
851///
852/// # Returns
853///
854/// * `Ok(Vec<u8>)` - Formatted output bytes ready for writing to file or stdout
855/// * `Err(SofError)` - Detailed error information about what went wrong
856///
857/// # Validation
858///
859/// The function performs comprehensive validation:
860/// - FHIR version compatibility between ViewDefinition and Bundle
861/// - ViewDefinition structure and logic validation
862/// - FHIRPath expression syntax and evaluation
863/// - Output format compatibility
864///
865/// # Examples
866///
867/// ```rust
868/// use helios_sof::{SofViewDefinition, SofBundle, ContentType, run_view_definition};
869///
870/// # #[cfg(feature = "R4")]
871/// # {
872/// // Create a simple ViewDefinition
873/// let view_json = serde_json::json!({
874///     "resourceType": "ViewDefinition",
875///     "status": "active",
876///     "resource": "Patient",
877///     "select": [{
878///         "column": [{
879///             "name": "id",
880///             "path": "id"
881///         }]
882///     }]
883/// });
884/// let view_def: helios_fhir::r4::ViewDefinition = serde_json::from_value(view_json)?;
885///
886/// // Create a simple Bundle
887/// let bundle_json = serde_json::json!({
888///     "resourceType": "Bundle",
889///     "type": "collection",
890///     "entry": []
891/// });
892/// let bundle: helios_fhir::r4::Bundle = serde_json::from_value(bundle_json)?;
893///
894/// let sof_view = SofViewDefinition::R4(view_def);
895/// let sof_bundle = SofBundle::R4(bundle);
896///
897/// // Generate CSV with headers
898/// let csv_output = run_view_definition(
899///     sof_view,
900///     sof_bundle,
901///     ContentType::CsvWithHeader
902/// )?;
903///
904/// // Write to file or stdout
905/// std::fs::write("output.csv", csv_output)?;
906/// # }
907/// # Ok::<(), Box<dyn std::error::Error>>(())
908/// ```
909///
910/// # Error Handling
911///
912/// Common error scenarios:
913///
914/// ```rust,no_run
915/// use helios_sof::{SofError, SofViewDefinition, SofBundle, ContentType, run_view_definition};
916///
917/// # let view = SofViewDefinition::R4(helios_fhir::r4::ViewDefinition::default());
918/// # let bundle = SofBundle::R4(helios_fhir::r4::Bundle::default());
919/// # let content_type = ContentType::Json;
920/// match run_view_definition(view, bundle, content_type) {
921///     Ok(output) => {
922///         println!("Success: {} bytes generated", output.len());
923///     },
924///     Err(SofError::InvalidViewDefinition(msg)) => {
925///         eprintln!("ViewDefinition error: {}", msg);
926///     },
927///     Err(SofError::FhirPathError(msg)) => {
928///         eprintln!("FHIRPath error: {}", msg);
929///     },
930///     Err(e) => {
931///         eprintln!("Other error: {}", e);
932///     }
933/// }
934/// ```
935pub fn run_view_definition(
936    view_definition: SofViewDefinition,
937    bundle: SofBundle,
938    content_type: ContentType,
939) -> Result<Vec<u8>, SofError> {
940    run_view_definition_with_options(view_definition, bundle, content_type, RunOptions::default())
941}
942
943/// Configuration options for Parquet file generation.
944#[derive(Debug, Clone)]
945pub struct ParquetOptions {
946    /// Target row group size in MB (64-1024)
947    pub row_group_size_mb: u32,
948    /// Target page size in KB (64-8192)
949    pub page_size_kb: u32,
950    /// Compression algorithm (none, snappy, gzip, lz4, brotli, zstd)
951    pub compression: String,
952    /// Maximum file size in MB (splits output when exceeded)
953    pub max_file_size_mb: Option<u32>,
954}
955
956impl Default for ParquetOptions {
957    fn default() -> Self {
958        Self {
959            row_group_size_mb: 256,
960            page_size_kb: 1024,
961            compression: "snappy".to_string(),
962            max_file_size_mb: None,
963        }
964    }
965}
966
967/// Options for filtering and controlling ViewDefinition execution
968#[derive(Debug, Clone, Default)]
969pub struct RunOptions {
970    /// Filter resources modified after this time
971    pub since: Option<DateTime<Utc>>,
972    /// Limit the number of results
973    pub limit: Option<usize>,
974    /// Page number for pagination (1-based)
975    pub page: Option<usize>,
976    /// Parquet-specific configuration options
977    pub parquet_options: Option<ParquetOptions>,
978}
979
980/// Execute a ViewDefinition transformation with additional filtering options.
981///
982/// This function extends the basic `run_view_definition` with support for:
983/// - Filtering resources by modification time (`since`)
984/// - Limiting results (`limit`)
985/// - Pagination (`page`)
986///
987/// # Arguments
988///
989/// * `view_definition` - The ViewDefinition to execute
990/// * `bundle` - The Bundle containing resources to transform
991/// * `content_type` - Desired output format
992/// * `options` - Additional filtering and control options
993///
994/// # Returns
995///
996/// The transformed data in the requested format, with filtering applied.
997pub fn run_view_definition_with_options(
998    view_definition: SofViewDefinition,
999    bundle: SofBundle,
1000    content_type: ContentType,
1001    options: RunOptions,
1002) -> Result<Vec<u8>, SofError> {
1003    // Filter bundle resources by since parameter before processing
1004    let filtered_bundle = if let Some(since) = options.since {
1005        filter_bundle_by_since(bundle, since)?
1006    } else {
1007        bundle
1008    };
1009
1010    // Process the ViewDefinition to generate tabular data
1011    let processed_result = process_view_definition(view_definition, filtered_bundle)?;
1012
1013    // Apply pagination if needed
1014    let processed_result = if options.limit.is_some() || options.page.is_some() {
1015        apply_pagination_to_result(processed_result, options.limit, options.page)?
1016    } else {
1017        processed_result
1018    };
1019
1020    // Format the result according to the requested content type
1021    format_output(
1022        processed_result,
1023        content_type,
1024        options.parquet_options.as_ref(),
1025    )
1026}
1027
1028pub fn process_view_definition(
1029    view_definition: SofViewDefinition,
1030    bundle: SofBundle,
1031) -> Result<ProcessedResult, SofError> {
1032    // Ensure both resources use the same FHIR version
1033    if view_definition.version() != bundle.version() {
1034        return Err(SofError::InvalidViewDefinition(
1035            "ViewDefinition and Bundle must use the same FHIR version".to_string(),
1036        ));
1037    }
1038
1039    match (view_definition, bundle) {
1040        #[cfg(feature = "R4")]
1041        (SofViewDefinition::R4(vd), SofBundle::R4(bundle)) => {
1042            process_view_definition_generic(vd, bundle)
1043        }
1044        #[cfg(feature = "R4B")]
1045        (SofViewDefinition::R4B(vd), SofBundle::R4B(bundle)) => {
1046            process_view_definition_generic(vd, bundle)
1047        }
1048        #[cfg(feature = "R5")]
1049        (SofViewDefinition::R5(vd), SofBundle::R5(bundle)) => {
1050            process_view_definition_generic(vd, bundle)
1051        }
1052        #[cfg(feature = "R6")]
1053        (SofViewDefinition::R6(vd), SofBundle::R6(bundle)) => {
1054            process_view_definition_generic(vd, bundle)
1055        }
1056        // This case should never happen due to the version check above,
1057        // but is needed for exhaustive pattern matching when multiple features are enabled
1058        #[cfg(any(
1059            all(feature = "R4", any(feature = "R4B", feature = "R5", feature = "R6")),
1060            all(feature = "R4B", any(feature = "R5", feature = "R6")),
1061            all(feature = "R5", feature = "R6")
1062        ))]
1063        _ => {
1064            unreachable!("Version mismatch should have been caught by the version check above")
1065        }
1066    }
1067}
1068
1069// Generic version-agnostic constant extraction
1070fn extract_view_definition_constants<VD: ViewDefinitionTrait>(
1071    view_definition: &VD,
1072) -> Result<HashMap<String, EvaluationResult>, SofError> {
1073    let mut variables = HashMap::new();
1074
1075    if let Some(constants) = view_definition.constants() {
1076        for constant in constants {
1077            let name = constant
1078                .name()
1079                .ok_or_else(|| {
1080                    SofError::InvalidViewDefinition("Constant name is required".to_string())
1081                })?
1082                .to_string();
1083
1084            let eval_result = constant.to_evaluation_result()?;
1085            // Constants are referenced with % prefix in FHIRPath expressions
1086            variables.insert(format!("%{}", name), eval_result);
1087        }
1088    }
1089
1090    Ok(variables)
1091}
1092
1093// Generic version-agnostic ViewDefinition processing
1094fn process_view_definition_generic<VD, B>(
1095    view_definition: VD,
1096    bundle: B,
1097) -> Result<ProcessedResult, SofError>
1098where
1099    VD: ViewDefinitionTrait,
1100    B: BundleTrait,
1101    B::Resource: ResourceTrait + Sync,
1102    VD::Select: Sync,
1103{
1104    validate_view_definition(&view_definition)?;
1105
1106    // Step 1: Extract constants/variables from ViewDefinition
1107    let variables = extract_view_definition_constants(&view_definition)?;
1108
1109    // Step 2: Filter resources by type and profile
1110    let target_resource_type = view_definition
1111        .resource()
1112        .ok_or_else(|| SofError::InvalidViewDefinition("Resource type is required".to_string()))?;
1113
1114    let filtered_resources = filter_resources(&bundle, target_resource_type)?;
1115
1116    // Step 3: Apply where clauses to filter resources
1117    let filtered_resources = apply_where_clauses(
1118        filtered_resources,
1119        view_definition.where_clauses(),
1120        &variables,
1121    )?;
1122
1123    // Step 4: Process all select clauses to generate rows with forEach support
1124    let select_clauses = view_definition.select().ok_or_else(|| {
1125        SofError::InvalidViewDefinition("At least one select clause is required".to_string())
1126    })?;
1127
1128    // Generate rows for each resource using the forEach-aware approach
1129    let (all_columns, rows) =
1130        generate_rows_from_selects(&filtered_resources, select_clauses, &variables)?;
1131
1132    Ok(ProcessedResult {
1133        columns: all_columns,
1134        rows,
1135    })
1136}
1137
1138// Generic version-agnostic validation
1139fn validate_view_definition<VD: ViewDefinitionTrait>(view_def: &VD) -> Result<(), SofError> {
1140    // Basic validation
1141    if view_def.resource().is_none_or(|s| s.is_empty()) {
1142        return Err(SofError::InvalidViewDefinition(
1143            "ViewDefinition must specify a resource type".to_string(),
1144        ));
1145    }
1146
1147    if view_def.select().is_none_or(|s| s.is_empty()) {
1148        return Err(SofError::InvalidViewDefinition(
1149            "ViewDefinition must have at least one select".to_string(),
1150        ));
1151    }
1152
1153    // Validate where clauses
1154    if let Some(where_clauses) = view_def.where_clauses() {
1155        validate_where_clauses(where_clauses)?;
1156    }
1157
1158    // Validate selects
1159    if let Some(selects) = view_def.select() {
1160        for select in selects {
1161            validate_select(select)?;
1162        }
1163    }
1164
1165    Ok(())
1166}
1167
1168// Generic where clause validation
1169fn validate_where_clauses<W: ViewDefinitionWhereTrait>(
1170    where_clauses: &[W],
1171) -> Result<(), SofError> {
1172    // Basic validation - just ensure paths are provided
1173    // Type checking will be done during actual evaluation
1174    for where_clause in where_clauses {
1175        if where_clause.path().is_none() {
1176            return Err(SofError::InvalidViewDefinition(
1177                "Where clause must have a path specified".to_string(),
1178            ));
1179        }
1180    }
1181    Ok(())
1182}
1183
1184// Generic helper - no longer needs to be version-specific
1185fn can_be_coerced_to_boolean(result: &EvaluationResult) -> bool {
1186    // Check if the result can be meaningfully used as a boolean in a where clause
1187    match result {
1188        // Boolean values are obviously OK
1189        EvaluationResult::Boolean(_, _) => true,
1190
1191        // Empty is OK (evaluates to false)
1192        EvaluationResult::Empty => true,
1193
1194        // Collections are OK - they evaluate based on whether they're empty or not
1195        EvaluationResult::Collection { .. } => true,
1196
1197        // Other types cannot be meaningfully coerced to boolean for where clauses
1198        // This includes: String, Integer, Decimal, Date, DateTime, Time, Quantity, Object
1199        _ => false,
1200    }
1201}
1202
1203// Generic select validation
1204fn validate_select<S: ViewDefinitionSelectTrait>(select: &S) -> Result<(), SofError> {
1205    validate_select_with_context(select, false)
1206}
1207
1208fn validate_select_with_context<S: ViewDefinitionSelectTrait>(
1209    select: &S,
1210    in_foreach_context: bool,
1211) -> Result<(), SofError>
1212where
1213    S::Select: ViewDefinitionSelectTrait,
1214{
1215    // Determine if we're entering a forEach context at this level
1216    let entering_foreach = select.for_each().is_some() || select.for_each_or_null().is_some();
1217    let current_foreach_context = in_foreach_context || entering_foreach;
1218
1219    // Validate collection attribute with the current forEach context
1220    if let Some(columns) = select.column() {
1221        for column in columns {
1222            if let Some(collection_value) = column.collection() {
1223                if !collection_value && !current_foreach_context {
1224                    return Err(SofError::InvalidViewDefinition(
1225                        "Column 'collection' attribute must be true when specified".to_string(),
1226                    ));
1227                }
1228            }
1229        }
1230    }
1231
1232    // Validate unionAll column consistency
1233    if let Some(union_selects) = select.union_all() {
1234        validate_union_all_columns(union_selects)?;
1235    }
1236
1237    // Recursively validate nested selects
1238    if let Some(nested_selects) = select.select() {
1239        for nested_select in nested_selects {
1240            validate_select_with_context(nested_select, current_foreach_context)?;
1241        }
1242    }
1243
1244    // Validate unionAll selects with forEach context
1245    if let Some(union_selects) = select.union_all() {
1246        for union_select in union_selects {
1247            validate_select_with_context(union_select, current_foreach_context)?;
1248        }
1249    }
1250
1251    Ok(())
1252}
1253
1254// Generic union validation
1255fn validate_union_all_columns<S: ViewDefinitionSelectTrait>(
1256    union_selects: &[S],
1257) -> Result<(), SofError> {
1258    if union_selects.len() < 2 {
1259        return Ok(());
1260    }
1261
1262    // Get column names and order from first select
1263    let first_select = &union_selects[0];
1264    let first_columns = get_column_names(first_select)?;
1265
1266    // Validate all other selects have the same column names in the same order
1267    for (index, union_select) in union_selects.iter().enumerate().skip(1) {
1268        let current_columns = get_column_names(union_select)?;
1269
1270        if current_columns != first_columns {
1271            if current_columns.len() != first_columns.len()
1272                || !current_columns
1273                    .iter()
1274                    .all(|name| first_columns.contains(name))
1275            {
1276                return Err(SofError::InvalidViewDefinition(format!(
1277                    "UnionAll branch {} has different column names than first branch",
1278                    index
1279                )));
1280            } else {
1281                return Err(SofError::InvalidViewDefinition(format!(
1282                    "UnionAll branch {} has columns in different order than first branch",
1283                    index
1284                )));
1285            }
1286        }
1287    }
1288
1289    Ok(())
1290}
1291
1292// Generic column name extraction
1293fn get_column_names<S: ViewDefinitionSelectTrait>(select: &S) -> Result<Vec<String>, SofError> {
1294    let mut column_names = Vec::new();
1295
1296    // Collect direct column names
1297    if let Some(columns) = select.column() {
1298        for column in columns {
1299            if let Some(name) = column.name() {
1300                column_names.push(name.to_string());
1301            }
1302        }
1303    }
1304
1305    // If this select has unionAll but no direct columns, get columns from first unionAll branch
1306    if column_names.is_empty() {
1307        if let Some(union_selects) = select.union_all() {
1308            if !union_selects.is_empty() {
1309                return get_column_names(&union_selects[0]);
1310            }
1311        }
1312    }
1313
1314    Ok(column_names)
1315}
1316
1317// Generic resource filtering
1318fn filter_resources<'a, B: BundleTrait>(
1319    bundle: &'a B,
1320    resource_type: &str,
1321) -> Result<Vec<&'a B::Resource>, SofError> {
1322    Ok(bundle
1323        .entries()
1324        .into_iter()
1325        .filter(|resource| resource.resource_name() == resource_type)
1326        .collect())
1327}
1328
1329// Generic where clause application
1330fn apply_where_clauses<'a, R, W>(
1331    resources: Vec<&'a R>,
1332    where_clauses: Option<&[W]>,
1333    variables: &HashMap<String, EvaluationResult>,
1334) -> Result<Vec<&'a R>, SofError>
1335where
1336    R: ResourceTrait,
1337    W: ViewDefinitionWhereTrait,
1338{
1339    if let Some(wheres) = where_clauses {
1340        let mut filtered = Vec::new();
1341
1342        for resource in resources {
1343            let mut include_resource = true;
1344
1345            // All where clauses must evaluate to true for the resource to be included
1346            for where_clause in wheres {
1347                let fhir_resource = resource.to_fhir_resource();
1348                let mut context = EvaluationContext::new(vec![fhir_resource]);
1349
1350                // Add variables to the context
1351                for (name, value) in variables {
1352                    context.set_variable_result(name, value.clone());
1353                }
1354
1355                let path = where_clause.path().ok_or_else(|| {
1356                    SofError::InvalidViewDefinition("Where clause path is required".to_string())
1357                })?;
1358
1359                match evaluate_expression(path, &context) {
1360                    Ok(result) => {
1361                        // Check if the result can be meaningfully used as a boolean
1362                        if !can_be_coerced_to_boolean(&result) {
1363                            return Err(SofError::InvalidViewDefinition(format!(
1364                                "Where clause path '{}' returns type '{}' which cannot be used as a boolean condition. \
1365                                 Where clauses must return boolean values, collections, or empty results.",
1366                                path,
1367                                result.type_name()
1368                            )));
1369                        }
1370
1371                        // Check if result is truthy (non-empty and not false)
1372                        if !is_truthy(&result) {
1373                            include_resource = false;
1374                            break;
1375                        }
1376                    }
1377                    Err(e) => {
1378                        return Err(SofError::FhirPathError(format!(
1379                            "Error evaluating where clause '{}': {}",
1380                            path, e
1381                        )));
1382                    }
1383                }
1384            }
1385
1386            if include_resource {
1387                filtered.push(resource);
1388            }
1389        }
1390
1391        Ok(filtered)
1392    } else {
1393        Ok(resources)
1394    }
1395}
1396
1397// Removed generate_rows_per_resource_r4 - replaced with new forEach-aware implementation
1398
1399// Removed generate_rows_with_for_each_r4 - replaced with new forEach-aware implementation
1400
1401// Helper functions for FHIRPath result processing
1402fn is_truthy(result: &EvaluationResult) -> bool {
1403    match result {
1404        EvaluationResult::Empty => false,
1405        EvaluationResult::Boolean(b, _) => *b,
1406        EvaluationResult::Collection { items, .. } => !items.is_empty(),
1407        _ => true, // Non-empty, non-false values are truthy
1408    }
1409}
1410
1411fn fhirpath_result_to_json_value_collection(result: EvaluationResult) -> Option<serde_json::Value> {
1412    match result {
1413        EvaluationResult::Empty => Some(serde_json::Value::Array(vec![])),
1414        EvaluationResult::Collection { items, .. } => {
1415            // Always return array for collection columns, even if empty
1416            let values: Vec<serde_json::Value> = items
1417                .into_iter()
1418                .filter_map(fhirpath_result_to_json_value)
1419                .collect();
1420            Some(serde_json::Value::Array(values))
1421        }
1422        // For non-collection results in collection columns, wrap in array
1423        single_result => {
1424            if let Some(json_val) = fhirpath_result_to_json_value(single_result) {
1425                Some(serde_json::Value::Array(vec![json_val]))
1426            } else {
1427                Some(serde_json::Value::Array(vec![]))
1428            }
1429        }
1430    }
1431}
1432
1433fn fhirpath_result_to_json_value(result: EvaluationResult) -> Option<serde_json::Value> {
1434    match result {
1435        EvaluationResult::Empty => None,
1436        EvaluationResult::Boolean(b, _) => Some(serde_json::Value::Bool(b)),
1437        EvaluationResult::Integer(i, _) => {
1438            Some(serde_json::Value::Number(serde_json::Number::from(i)))
1439        }
1440        EvaluationResult::Decimal(d, _) => {
1441            // Check if this Decimal represents a whole number
1442            if d.fract().is_zero() {
1443                // Convert to integer if no fractional part
1444                if let Ok(i) = d.to_string().parse::<i64>() {
1445                    Some(serde_json::Value::Number(serde_json::Number::from(i)))
1446                } else {
1447                    // Handle very large numbers as strings
1448                    Some(serde_json::Value::String(d.to_string()))
1449                }
1450            } else {
1451                // Convert Decimal to a float for fractional numbers
1452                if let Ok(f) = d.to_string().parse::<f64>() {
1453                    if let Some(num) = serde_json::Number::from_f64(f) {
1454                        Some(serde_json::Value::Number(num))
1455                    } else {
1456                        Some(serde_json::Value::String(d.to_string()))
1457                    }
1458                } else {
1459                    Some(serde_json::Value::String(d.to_string()))
1460                }
1461            }
1462        }
1463        EvaluationResult::String(s, _) => Some(serde_json::Value::String(s)),
1464        EvaluationResult::Date(s, _) => Some(serde_json::Value::String(s)),
1465        EvaluationResult::DateTime(s, _) => {
1466            // Remove "@" prefix from datetime strings if present
1467            let cleaned = s.strip_prefix("@").unwrap_or(&s);
1468            Some(serde_json::Value::String(cleaned.to_string()))
1469        }
1470        EvaluationResult::Time(s, _) => {
1471            // Remove "@T" prefix from time strings if present
1472            let cleaned = s.strip_prefix("@T").unwrap_or(&s);
1473            Some(serde_json::Value::String(cleaned.to_string()))
1474        }
1475        EvaluationResult::Collection { items, .. } => {
1476            if items.len() == 1 {
1477                // Single item collection - unwrap to the item itself
1478                fhirpath_result_to_json_value(items.into_iter().next().unwrap())
1479            } else if items.is_empty() {
1480                None
1481            } else {
1482                // Multiple items - convert to array
1483                let values: Vec<serde_json::Value> = items
1484                    .into_iter()
1485                    .filter_map(fhirpath_result_to_json_value)
1486                    .collect();
1487                Some(serde_json::Value::Array(values))
1488            }
1489        }
1490        EvaluationResult::Object { map, .. } => {
1491            let mut json_map = serde_json::Map::new();
1492            for (k, v) in map {
1493                if let Some(json_val) = fhirpath_result_to_json_value(v) {
1494                    json_map.insert(k, json_val);
1495                }
1496            }
1497            Some(serde_json::Value::Object(json_map))
1498        }
1499        // Handle other result types as strings
1500        _ => Some(serde_json::Value::String(format!("{:?}", result))),
1501    }
1502}
1503
1504fn extract_iteration_items(result: EvaluationResult) -> Vec<EvaluationResult> {
1505    match result {
1506        EvaluationResult::Collection { items, .. } => items,
1507        EvaluationResult::Empty => Vec::new(),
1508        single_item => vec![single_item],
1509    }
1510}
1511
1512// Generic row generation functions
1513
1514fn generate_rows_from_selects<R, S>(
1515    resources: &[&R],
1516    selects: &[S],
1517    variables: &HashMap<String, EvaluationResult>,
1518) -> Result<(Vec<String>, Vec<ProcessedRow>), SofError>
1519where
1520    R: ResourceTrait + Sync,
1521    S: ViewDefinitionSelectTrait + Sync,
1522    S::Select: ViewDefinitionSelectTrait,
1523{
1524    // Process resources in parallel
1525    let resource_results: Result<Vec<_>, _> = resources
1526        .par_iter()
1527        .map(|resource| {
1528            // Each thread gets its own local column vector
1529            let mut local_columns = Vec::new();
1530            let resource_rows =
1531                generate_rows_for_resource(*resource, selects, &mut local_columns, variables)?;
1532            Ok::<(Vec<String>, Vec<ProcessedRow>), SofError>((local_columns, resource_rows))
1533        })
1534        .collect();
1535
1536    // Handle errors from parallel processing
1537    let resource_results = resource_results?;
1538
1539    // Merge columns from all threads (maintaining order is important)
1540    let mut final_columns = Vec::new();
1541    let mut all_rows = Vec::new();
1542
1543    for (local_columns, resource_rows) in resource_results {
1544        // Merge columns, avoiding duplicates
1545        for col in local_columns {
1546            if !final_columns.contains(&col) {
1547                final_columns.push(col);
1548            }
1549        }
1550        all_rows.extend(resource_rows);
1551    }
1552
1553    Ok((final_columns, all_rows))
1554}
1555
1556fn generate_rows_for_resource<R, S>(
1557    resource: &R,
1558    selects: &[S],
1559    all_columns: &mut Vec<String>,
1560    variables: &HashMap<String, EvaluationResult>,
1561) -> Result<Vec<ProcessedRow>, SofError>
1562where
1563    R: ResourceTrait,
1564    S: ViewDefinitionSelectTrait,
1565    S::Select: ViewDefinitionSelectTrait,
1566{
1567    let fhir_resource = resource.to_fhir_resource();
1568    let mut context = EvaluationContext::new(vec![fhir_resource]);
1569
1570    // Add variables to the context
1571    for (name, value) in variables {
1572        context.set_variable_result(name, value.clone());
1573    }
1574
1575    // Generate all possible row combinations for this resource
1576    let row_combinations = generate_row_combinations(&context, selects, all_columns, variables)?;
1577
1578    Ok(row_combinations)
1579}
1580
1581#[derive(Debug, Clone)]
1582struct RowCombination {
1583    values: Vec<Option<serde_json::Value>>,
1584}
1585
1586fn generate_row_combinations<S>(
1587    context: &EvaluationContext,
1588    selects: &[S],
1589    all_columns: &mut Vec<String>,
1590    variables: &HashMap<String, EvaluationResult>,
1591) -> Result<Vec<ProcessedRow>, SofError>
1592where
1593    S: ViewDefinitionSelectTrait,
1594    S::Select: ViewDefinitionSelectTrait,
1595{
1596    // First pass: collect all column names to ensure consistent ordering
1597    collect_all_columns(selects, all_columns)?;
1598
1599    // Second pass: generate all row combinations
1600    let mut row_combinations = vec![RowCombination {
1601        values: vec![None; all_columns.len()],
1602    }];
1603
1604    for select in selects {
1605        row_combinations =
1606            expand_select_combinations(context, select, &row_combinations, all_columns, variables)?;
1607    }
1608
1609    // Convert to ProcessedRow format
1610    Ok(row_combinations
1611        .into_iter()
1612        .map(|combo| ProcessedRow {
1613            values: combo.values,
1614        })
1615        .collect())
1616}
1617
1618fn collect_all_columns<S>(selects: &[S], all_columns: &mut Vec<String>) -> Result<(), SofError>
1619where
1620    S: ViewDefinitionSelectTrait,
1621{
1622    for select in selects {
1623        // Add columns from this select
1624        if let Some(columns) = select.column() {
1625            for col in columns {
1626                if let Some(name) = col.name() {
1627                    if !all_columns.contains(&name.to_string()) {
1628                        all_columns.push(name.to_string());
1629                    }
1630                }
1631            }
1632        }
1633
1634        // Recursively collect from nested selects
1635        if let Some(nested_selects) = select.select() {
1636            collect_all_columns(nested_selects, all_columns)?;
1637        }
1638
1639        // Collect from unionAll
1640        if let Some(union_selects) = select.union_all() {
1641            collect_all_columns(union_selects, all_columns)?;
1642        }
1643    }
1644    Ok(())
1645}
1646
1647fn expand_select_combinations<S>(
1648    context: &EvaluationContext,
1649    select: &S,
1650    existing_combinations: &[RowCombination],
1651    all_columns: &[String],
1652    variables: &HashMap<String, EvaluationResult>,
1653) -> Result<Vec<RowCombination>, SofError>
1654where
1655    S: ViewDefinitionSelectTrait,
1656    S::Select: ViewDefinitionSelectTrait,
1657{
1658    // Handle forEach and forEachOrNull
1659    if let Some(for_each_path) = select.for_each() {
1660        return expand_for_each_combinations(
1661            context,
1662            select,
1663            existing_combinations,
1664            all_columns,
1665            for_each_path,
1666            false,
1667            variables,
1668        );
1669    }
1670
1671    if let Some(for_each_or_null_path) = select.for_each_or_null() {
1672        return expand_for_each_combinations(
1673            context,
1674            select,
1675            existing_combinations,
1676            all_columns,
1677            for_each_or_null_path,
1678            true,
1679            variables,
1680        );
1681    }
1682
1683    // Handle repeat directive for recursive traversal
1684    if let Some(repeat_paths) = select.repeat() {
1685        return expand_repeat_combinations(
1686            context,
1687            select,
1688            existing_combinations,
1689            all_columns,
1690            &repeat_paths,
1691            variables,
1692        );
1693    }
1694
1695    // Handle regular columns (no forEach)
1696    let mut new_combinations = Vec::new();
1697
1698    for existing_combo in existing_combinations {
1699        let mut new_combo = existing_combo.clone();
1700
1701        // Add values from this select's columns
1702        if let Some(columns) = select.column() {
1703            for col in columns {
1704                if let Some(col_name) = col.name() {
1705                    if let Some(col_index) = all_columns.iter().position(|name| name == col_name) {
1706                        let path = col.path().ok_or_else(|| {
1707                            SofError::InvalidViewDefinition("Column path is required".to_string())
1708                        })?;
1709
1710                        match evaluate_expression(path, context) {
1711                            Ok(result) => {
1712                                // Check if this column is marked as a collection
1713                                let is_collection = col.collection().unwrap_or(false);
1714
1715                                new_combo.values[col_index] = if is_collection {
1716                                    fhirpath_result_to_json_value_collection(result)
1717                                } else {
1718                                    fhirpath_result_to_json_value(result)
1719                                };
1720                            }
1721                            Err(e) => {
1722                                return Err(SofError::FhirPathError(format!(
1723                                    "Error evaluating column '{}' with path '{}': {}",
1724                                    col_name, path, e
1725                                )));
1726                            }
1727                        }
1728                    }
1729                }
1730            }
1731        }
1732
1733        new_combinations.push(new_combo);
1734    }
1735
1736    // Handle nested selects
1737    if let Some(nested_selects) = select.select() {
1738        for nested_select in nested_selects {
1739            new_combinations = expand_select_combinations(
1740                context,
1741                nested_select,
1742                &new_combinations,
1743                all_columns,
1744                variables,
1745            )?;
1746        }
1747    }
1748
1749    // Handle unionAll
1750    if let Some(union_selects) = select.union_all() {
1751        let mut union_combinations = Vec::new();
1752
1753        // Process each unionAll select independently, using the combinations that already have
1754        // values from this select's columns and nested selects
1755        for union_select in union_selects {
1756            let select_combinations = expand_select_combinations(
1757                context,
1758                union_select,
1759                &new_combinations,
1760                all_columns,
1761                variables,
1762            )?;
1763            union_combinations.extend(select_combinations);
1764        }
1765
1766        // unionAll replaces new_combinations with the union results
1767        // If no union results, this resource should be filtered out (no rows for this resource)
1768        new_combinations = union_combinations;
1769    }
1770
1771    Ok(new_combinations)
1772}
1773
1774fn expand_for_each_combinations<S>(
1775    context: &EvaluationContext,
1776    select: &S,
1777    existing_combinations: &[RowCombination],
1778    all_columns: &[String],
1779    for_each_path: &str,
1780    allow_null: bool,
1781    variables: &HashMap<String, EvaluationResult>,
1782) -> Result<Vec<RowCombination>, SofError>
1783where
1784    S: ViewDefinitionSelectTrait,
1785    S::Select: ViewDefinitionSelectTrait,
1786{
1787    // Evaluate the forEach expression to get iteration items
1788    let for_each_result = evaluate_expression(for_each_path, context).map_err(|e| {
1789        SofError::FhirPathError(format!(
1790            "Error evaluating forEach expression '{}': {}",
1791            for_each_path, e
1792        ))
1793    })?;
1794
1795    let iteration_items = extract_iteration_items(for_each_result);
1796
1797    if iteration_items.is_empty() {
1798        if allow_null {
1799            // forEachOrNull: generate null rows
1800            let mut new_combinations = Vec::new();
1801            for existing_combo in existing_combinations {
1802                let mut new_combo = existing_combo.clone();
1803
1804                // Set column values to null for this forEach scope
1805                if let Some(columns) = select.column() {
1806                    for col in columns {
1807                        if let Some(col_name) = col.name() {
1808                            if let Some(col_index) =
1809                                all_columns.iter().position(|name| name == col_name)
1810                            {
1811                                new_combo.values[col_index] = None;
1812                            }
1813                        }
1814                    }
1815                }
1816
1817                new_combinations.push(new_combo);
1818            }
1819            return Ok(new_combinations);
1820        } else {
1821            // forEach with empty collection: no rows
1822            return Ok(Vec::new());
1823        }
1824    }
1825
1826    let mut new_combinations = Vec::new();
1827
1828    // For each iteration item, create new combinations
1829    for item in &iteration_items {
1830        // Create a new context with the iteration item
1831        let _item_context = create_iteration_context(item, variables);
1832
1833        for existing_combo in existing_combinations {
1834            let mut new_combo = existing_combo.clone();
1835
1836            // Evaluate columns in the context of the iteration item
1837            if let Some(columns) = select.column() {
1838                for col in columns {
1839                    if let Some(col_name) = col.name() {
1840                        if let Some(col_index) =
1841                            all_columns.iter().position(|name| name == col_name)
1842                        {
1843                            let path = col.path().ok_or_else(|| {
1844                                SofError::InvalidViewDefinition(
1845                                    "Column path is required".to_string(),
1846                                )
1847                            })?;
1848
1849                            // Use the iteration item directly for path evaluation
1850                            let result = if path == "$this" {
1851                                // Special case: $this refers to the current iteration item
1852                                item.clone()
1853                            } else {
1854                                // Evaluate the path on the iteration item
1855                                evaluate_path_on_item(path, item, variables)?
1856                            };
1857
1858                            // Check if this column is marked as a collection
1859                            let is_collection = col.collection().unwrap_or(false);
1860
1861                            new_combo.values[col_index] = if is_collection {
1862                                fhirpath_result_to_json_value_collection(result)
1863                            } else {
1864                                fhirpath_result_to_json_value(result)
1865                            };
1866                        }
1867                    }
1868                }
1869            }
1870
1871            new_combinations.push(new_combo);
1872        }
1873    }
1874
1875    // Handle nested selects with the forEach context
1876    if let Some(nested_selects) = select.select() {
1877        let mut final_combinations = Vec::new();
1878
1879        for item in &iteration_items {
1880            let item_context = create_iteration_context(item, variables);
1881
1882            // For each iteration item, we need to start with the combinations that have
1883            // the correct column values for this forEach scope
1884            for existing_combo in existing_combinations {
1885                // Find the combination that corresponds to this iteration item
1886                // by looking at the values we set for columns in this forEach scope
1887                let mut base_combo = existing_combo.clone();
1888
1889                // Update the base combination with column values for this iteration item
1890                if let Some(columns) = select.column() {
1891                    for col in columns {
1892                        if let Some(col_name) = col.name() {
1893                            if let Some(col_index) =
1894                                all_columns.iter().position(|name| name == col_name)
1895                            {
1896                                let path = col.path().ok_or_else(|| {
1897                                    SofError::InvalidViewDefinition(
1898                                        "Column path is required".to_string(),
1899                                    )
1900                                })?;
1901
1902                                let result = if path == "$this" {
1903                                    item.clone()
1904                                } else {
1905                                    evaluate_path_on_item(path, item, variables)?
1906                                };
1907
1908                                // Check if this column is marked as a collection
1909                                let is_collection = col.collection().unwrap_or(false);
1910
1911                                base_combo.values[col_index] = if is_collection {
1912                                    fhirpath_result_to_json_value_collection(result)
1913                                } else {
1914                                    fhirpath_result_to_json_value(result)
1915                                };
1916                            }
1917                        }
1918                    }
1919                }
1920
1921                // Start with this base combination for nested processing
1922                let mut item_combinations = vec![base_combo];
1923
1924                // Process nested selects
1925                for nested_select in nested_selects {
1926                    item_combinations = expand_select_combinations(
1927                        &item_context,
1928                        nested_select,
1929                        &item_combinations,
1930                        all_columns,
1931                        variables,
1932                    )?;
1933                }
1934
1935                final_combinations.extend(item_combinations);
1936            }
1937        }
1938
1939        new_combinations = final_combinations;
1940    }
1941
1942    // Handle unionAll within forEach context
1943    if let Some(union_selects) = select.union_all() {
1944        let mut union_combinations = Vec::new();
1945
1946        for item in &iteration_items {
1947            let item_context = create_iteration_context(item, variables);
1948
1949            // For each iteration item, process all unionAll selects
1950            for existing_combo in existing_combinations {
1951                let mut base_combo = existing_combo.clone();
1952
1953                // Update the base combination with column values for this iteration item
1954                if let Some(columns) = select.column() {
1955                    for col in columns {
1956                        if let Some(col_name) = col.name() {
1957                            if let Some(col_index) =
1958                                all_columns.iter().position(|name| name == col_name)
1959                            {
1960                                let path = col.path().ok_or_else(|| {
1961                                    SofError::InvalidViewDefinition(
1962                                        "Column path is required".to_string(),
1963                                    )
1964                                })?;
1965
1966                                let result = if path == "$this" {
1967                                    item.clone()
1968                                } else {
1969                                    evaluate_path_on_item(path, item, variables)?
1970                                };
1971
1972                                // Check if this column is marked as a collection
1973                                let is_collection = col.collection().unwrap_or(false);
1974
1975                                base_combo.values[col_index] = if is_collection {
1976                                    fhirpath_result_to_json_value_collection(result)
1977                                } else {
1978                                    fhirpath_result_to_json_value(result)
1979                                };
1980                            }
1981                        }
1982                    }
1983                }
1984
1985                // Also evaluate columns from nested selects and add them to base_combo
1986                if let Some(nested_selects) = select.select() {
1987                    for nested_select in nested_selects {
1988                        if let Some(nested_columns) = nested_select.column() {
1989                            for col in nested_columns {
1990                                if let Some(col_name) = col.name() {
1991                                    if let Some(col_index) =
1992                                        all_columns.iter().position(|name| name == col_name)
1993                                    {
1994                                        let path = col.path().ok_or_else(|| {
1995                                            SofError::InvalidViewDefinition(
1996                                                "Column path is required".to_string(),
1997                                            )
1998                                        })?;
1999
2000                                        let result = if path == "$this" {
2001                                            item.clone()
2002                                        } else {
2003                                            evaluate_path_on_item(path, item, variables)?
2004                                        };
2005
2006                                        // Check if this column is marked as a collection
2007                                        let is_collection = col.collection().unwrap_or(false);
2008
2009                                        base_combo.values[col_index] = if is_collection {
2010                                            fhirpath_result_to_json_value_collection(result)
2011                                        } else {
2012                                            fhirpath_result_to_json_value(result)
2013                                        };
2014                                    }
2015                                }
2016                            }
2017                        }
2018                    }
2019                }
2020
2021                // Process each unionAll select independently for this iteration item
2022                for union_select in union_selects {
2023                    let mut select_combinations = vec![base_combo.clone()];
2024                    select_combinations = expand_select_combinations(
2025                        &item_context,
2026                        union_select,
2027                        &select_combinations,
2028                        all_columns,
2029                        variables,
2030                    )?;
2031                    union_combinations.extend(select_combinations);
2032                }
2033            }
2034        }
2035
2036        // unionAll replaces new_combinations with the union results
2037        // If no union results, filter out this resource (no rows for this resource)
2038        new_combinations = union_combinations;
2039    }
2040
2041    Ok(new_combinations)
2042}
2043
2044fn expand_repeat_combinations<S>(
2045    context: &EvaluationContext,
2046    select: &S,
2047    existing_combinations: &[RowCombination],
2048    all_columns: &[String],
2049    repeat_paths: &[&str],
2050    variables: &HashMap<String, EvaluationResult>,
2051) -> Result<Vec<RowCombination>, SofError>
2052where
2053    S: ViewDefinitionSelectTrait,
2054    S::Select: ViewDefinitionSelectTrait,
2055{
2056    // The repeat directive performs recursive traversal:
2057    // 1. For each repeat path, find child elements from the current context
2058    // 2. For each child element:
2059    //    a. Evaluate columns in the child's context
2060    //    b. Recursively process the child with the same repeat paths
2061    // 3. Union all results together
2062    //
2063    // Note: Unlike forEach, repeat does NOT process the current level's columns
2064    // - it ONLY processes elements found via the repeat paths
2065
2066    let mut all_combinations = Vec::new();
2067
2068    // Process each existing combination
2069    for existing_combo in existing_combinations {
2070        // Process each repeat path to find children to traverse
2071        for repeat_path in repeat_paths {
2072            // Evaluate the repeat path to get child elements
2073            let repeat_result = evaluate_expression(repeat_path, context).map_err(|e| {
2074                SofError::FhirPathError(format!(
2075                    "Error evaluating repeat expression '{}': {}",
2076                    repeat_path, e
2077                ))
2078            })?;
2079
2080            let child_items = extract_iteration_items(repeat_result);
2081
2082            // For each child item found via this repeat path
2083            for child_item in &child_items {
2084                // Create a combination for this child with current level's columns
2085                let mut child_combo = existing_combo.clone();
2086
2087                // Evaluate columns in the context of this child item
2088                if let Some(columns) = select.column() {
2089                    for col in columns {
2090                        if let Some(col_name) = col.name() {
2091                            if let Some(col_index) =
2092                                all_columns.iter().position(|name| name == col_name)
2093                            {
2094                                let path = col.path().ok_or_else(|| {
2095                                    SofError::InvalidViewDefinition(
2096                                        "Column path is required".to_string(),
2097                                    )
2098                                })?;
2099
2100                                // Evaluate the path on the child item
2101                                let result = if path == "$this" {
2102                                    child_item.clone()
2103                                } else {
2104                                    evaluate_path_on_item(path, child_item, variables)?
2105                                };
2106
2107                                let is_collection = col.collection().unwrap_or(false);
2108                                child_combo.values[col_index] = if is_collection {
2109                                    fhirpath_result_to_json_value_collection(result)
2110                                } else {
2111                                    fhirpath_result_to_json_value(result)
2112                                };
2113                            }
2114                        }
2115                    }
2116                }
2117
2118                // Create context for this child item
2119                let child_context = create_iteration_context(child_item, variables);
2120
2121                // Start with the child combination we just created
2122                let mut child_combinations = vec![child_combo.clone()];
2123
2124                // Process nested selects (like forEach/forEachOrNull) in the child's context
2125                if let Some(nested_selects) = select.select() {
2126                    for nested_select in nested_selects {
2127                        child_combinations = expand_select_combinations(
2128                            &child_context,
2129                            nested_select,
2130                            &child_combinations,
2131                            all_columns,
2132                            variables,
2133                        )?;
2134                    }
2135                }
2136
2137                // Add the processed combinations to our results
2138                // (these may have been filtered by forEach, which is correct)
2139                all_combinations.extend(child_combinations);
2140
2141                // Now recursively process this child with the same repeat paths
2142                // IMPORTANT: Use the original child_combo, not the forEach-filtered results
2143                let recursive_combinations = expand_repeat_combinations(
2144                    &child_context,
2145                    select,
2146                    &[child_combo],
2147                    all_columns,
2148                    repeat_paths,
2149                    variables,
2150                )?;
2151
2152                all_combinations.extend(recursive_combinations);
2153            }
2154        }
2155    }
2156
2157    Ok(all_combinations)
2158}
2159
2160// Generic helper functions
2161fn evaluate_path_on_item(
2162    path: &str,
2163    item: &EvaluationResult,
2164    variables: &HashMap<String, EvaluationResult>,
2165) -> Result<EvaluationResult, SofError> {
2166    // Create a temporary context with the iteration item as the root resource
2167    let mut temp_context = match item {
2168        EvaluationResult::Object { .. } => {
2169            // Convert the iteration item to a resource-like structure for FHIRPath evaluation
2170            // For simplicity, we'll create a basic context where the item is available for evaluation
2171            let mut context = EvaluationContext::new(vec![]);
2172            context.this = Some(item.clone());
2173            context
2174        }
2175        _ => EvaluationContext::new(vec![]),
2176    };
2177
2178    // Add variables to the temporary context
2179    for (name, value) in variables {
2180        temp_context.set_variable_result(name, value.clone());
2181    }
2182
2183    // Evaluate the FHIRPath expression in the context of the iteration item
2184    match evaluate_expression(path, &temp_context) {
2185        Ok(result) => Ok(result),
2186        Err(_e) => {
2187            // If FHIRPath evaluation fails, try simple property access as fallback
2188            match item {
2189                EvaluationResult::Object { map, .. } => {
2190                    if let Some(value) = map.get(path) {
2191                        Ok(value.clone())
2192                    } else {
2193                        Ok(EvaluationResult::Empty)
2194                    }
2195                }
2196                _ => Ok(EvaluationResult::Empty),
2197            }
2198        }
2199    }
2200}
2201
2202fn create_iteration_context(
2203    item: &EvaluationResult,
2204    variables: &HashMap<String, EvaluationResult>,
2205) -> EvaluationContext {
2206    // Create a new context with the iteration item as the root
2207    let mut context = EvaluationContext::new(vec![]);
2208    context.this = Some(item.clone());
2209
2210    // Preserve variables from the parent context
2211    for (name, value) in variables {
2212        context.set_variable_result(name, value.clone());
2213    }
2214
2215    context
2216}
2217
2218/// Filter a bundle's resources by their lastUpdated metadata
2219fn filter_bundle_by_since(bundle: SofBundle, since: DateTime<Utc>) -> Result<SofBundle, SofError> {
2220    match bundle {
2221        #[cfg(feature = "R4")]
2222        SofBundle::R4(mut b) => {
2223            if let Some(entries) = b.entry.as_mut() {
2224                entries.retain(|entry| {
2225                    entry
2226                        .resource
2227                        .as_ref()
2228                        .and_then(|r| r.get_last_updated())
2229                        .map(|last_updated| last_updated > since)
2230                        .unwrap_or(false)
2231                });
2232            }
2233            Ok(SofBundle::R4(b))
2234        }
2235        #[cfg(feature = "R4B")]
2236        SofBundle::R4B(mut b) => {
2237            if let Some(entries) = b.entry.as_mut() {
2238                entries.retain(|entry| {
2239                    entry
2240                        .resource
2241                        .as_ref()
2242                        .and_then(|r| r.get_last_updated())
2243                        .map(|last_updated| last_updated > since)
2244                        .unwrap_or(false)
2245                });
2246            }
2247            Ok(SofBundle::R4B(b))
2248        }
2249        #[cfg(feature = "R5")]
2250        SofBundle::R5(mut b) => {
2251            if let Some(entries) = b.entry.as_mut() {
2252                entries.retain(|entry| {
2253                    entry
2254                        .resource
2255                        .as_ref()
2256                        .and_then(|r| r.get_last_updated())
2257                        .map(|last_updated| last_updated > since)
2258                        .unwrap_or(false)
2259                });
2260            }
2261            Ok(SofBundle::R5(b))
2262        }
2263        #[cfg(feature = "R6")]
2264        SofBundle::R6(mut b) => {
2265            if let Some(entries) = b.entry.as_mut() {
2266                entries.retain(|entry| {
2267                    entry
2268                        .resource
2269                        .as_ref()
2270                        .and_then(|r| r.get_last_updated())
2271                        .map(|last_updated| last_updated > since)
2272                        .unwrap_or(false)
2273                });
2274            }
2275            Ok(SofBundle::R6(b))
2276        }
2277    }
2278}
2279
2280/// Apply pagination to processed results
2281fn apply_pagination_to_result(
2282    mut result: ProcessedResult,
2283    limit: Option<usize>,
2284    page: Option<usize>,
2285) -> Result<ProcessedResult, SofError> {
2286    if let Some(limit) = limit {
2287        let page_num = page.unwrap_or(1);
2288        if page_num == 0 {
2289            return Err(SofError::InvalidViewDefinition(
2290                "Page number must be greater than 0".to_string(),
2291            ));
2292        }
2293
2294        let start_index = (page_num - 1) * limit;
2295        if start_index >= result.rows.len() {
2296            // Return empty result if page is beyond data
2297            result.rows.clear();
2298        } else {
2299            let end_index = std::cmp::min(start_index + limit, result.rows.len());
2300            result.rows = result.rows[start_index..end_index].to_vec();
2301        }
2302    }
2303
2304    Ok(result)
2305}
2306
2307fn format_output(
2308    result: ProcessedResult,
2309    content_type: ContentType,
2310    parquet_options: Option<&ParquetOptions>,
2311) -> Result<Vec<u8>, SofError> {
2312    match content_type {
2313        ContentType::Csv | ContentType::CsvWithHeader => {
2314            format_csv(result, content_type == ContentType::CsvWithHeader)
2315        }
2316        ContentType::Json => format_json(result),
2317        ContentType::NdJson => format_ndjson(result),
2318        ContentType::Parquet => format_parquet(result, parquet_options),
2319    }
2320}
2321
2322fn format_csv(result: ProcessedResult, include_header: bool) -> Result<Vec<u8>, SofError> {
2323    let mut wtr = csv::Writer::from_writer(vec![]);
2324
2325    if include_header {
2326        wtr.write_record(&result.columns)?;
2327    }
2328
2329    for row in result.rows {
2330        let record: Vec<String> = row
2331            .values
2332            .iter()
2333            .map(|v| match v {
2334                Some(val) => {
2335                    // For string values, extract the raw string instead of JSON serializing
2336                    if let serde_json::Value::String(s) = val {
2337                        s.clone()
2338                    } else {
2339                        // For non-string values, use JSON serialization
2340                        serde_json::to_string(val).unwrap_or_default()
2341                    }
2342                }
2343                None => String::new(),
2344            })
2345            .collect();
2346        wtr.write_record(&record)?;
2347    }
2348
2349    wtr.into_inner()
2350        .map_err(|e| SofError::CsvWriterError(e.to_string()))
2351}
2352
2353fn format_json(result: ProcessedResult) -> Result<Vec<u8>, SofError> {
2354    let mut output = Vec::new();
2355
2356    for row in result.rows {
2357        let mut row_obj = serde_json::Map::new();
2358        for (i, column) in result.columns.iter().enumerate() {
2359            let value = row
2360                .values
2361                .get(i)
2362                .and_then(|v| v.as_ref())
2363                .cloned()
2364                .unwrap_or(serde_json::Value::Null);
2365            row_obj.insert(column.clone(), value);
2366        }
2367        output.push(serde_json::Value::Object(row_obj));
2368    }
2369
2370    Ok(serde_json::to_vec_pretty(&output)?)
2371}
2372
2373fn format_ndjson(result: ProcessedResult) -> Result<Vec<u8>, SofError> {
2374    let mut output = Vec::new();
2375
2376    for row in result.rows {
2377        let mut row_obj = serde_json::Map::new();
2378        for (i, column) in result.columns.iter().enumerate() {
2379            let value = row
2380                .values
2381                .get(i)
2382                .and_then(|v| v.as_ref())
2383                .cloned()
2384                .unwrap_or(serde_json::Value::Null);
2385            row_obj.insert(column.clone(), value);
2386        }
2387        let line = serde_json::to_string(&serde_json::Value::Object(row_obj))?;
2388        output.extend_from_slice(line.as_bytes());
2389        output.push(b'\n');
2390    }
2391
2392    Ok(output)
2393}
2394
2395fn format_parquet(
2396    result: ProcessedResult,
2397    options: Option<&ParquetOptions>,
2398) -> Result<Vec<u8>, SofError> {
2399    use arrow::record_batch::RecordBatch;
2400    use parquet::arrow::ArrowWriter;
2401    use parquet::basic::Compression;
2402    use parquet::file::properties::WriterProperties;
2403    use std::io::Cursor;
2404
2405    // Create Arrow schema from columns and sample data
2406    let schema = parquet_schema::create_arrow_schema(&result.columns, &result.rows)?;
2407    let schema_ref = std::sync::Arc::new(schema.clone());
2408
2409    // Get configuration from options or use defaults
2410    let parquet_opts = options.cloned().unwrap_or_default();
2411
2412    // Calculate optimal batch size based on row count and estimated row size
2413    let target_row_group_size_bytes = (parquet_opts.row_group_size_mb as usize) * 1024 * 1024;
2414    let target_page_size_bytes = (parquet_opts.page_size_kb as usize) * 1024;
2415    const TARGET_ROWS_PER_BATCH: usize = 100_000; // Default batch size
2416    const MAX_ROWS_PER_BATCH: usize = 500_000; // Maximum to prevent memory issues
2417
2418    // Estimate average row size from first 100 rows
2419    let sample_size = std::cmp::min(100, result.rows.len());
2420    let mut estimated_row_size = 100; // Default estimate in bytes
2421
2422    if sample_size > 0 {
2423        let sample_json_size: usize = result.rows[..sample_size]
2424            .iter()
2425            .map(|row| {
2426                row.values
2427                    .iter()
2428                    .filter_map(|v| v.as_ref())
2429                    .map(|v| v.to_string().len())
2430                    .sum::<usize>()
2431            })
2432            .sum();
2433        estimated_row_size = (sample_json_size / sample_size).max(50);
2434    }
2435
2436    // Calculate optimal batch size
2437    let optimal_batch_size = (target_row_group_size_bytes / estimated_row_size)
2438        .clamp(TARGET_ROWS_PER_BATCH, MAX_ROWS_PER_BATCH);
2439
2440    // Parse compression algorithm
2441    use parquet::basic::BrotliLevel;
2442    use parquet::basic::GzipLevel;
2443    use parquet::basic::ZstdLevel;
2444
2445    let compression = match parquet_opts.compression.as_str() {
2446        "none" => Compression::UNCOMPRESSED,
2447        "gzip" => Compression::GZIP(GzipLevel::default()),
2448        "lz4" => Compression::LZ4,
2449        "brotli" => Compression::BROTLI(BrotliLevel::default()),
2450        "zstd" => Compression::ZSTD(ZstdLevel::default()),
2451        _ => Compression::SNAPPY, // Default to snappy
2452    };
2453
2454    // Set up writer properties with optimized settings
2455    let props = WriterProperties::builder()
2456        .set_compression(compression)
2457        .set_max_row_group_size(target_row_group_size_bytes)
2458        .set_data_page_row_count_limit(20_000) // Optimal for predicate pushdown
2459        .set_data_page_size_limit(target_page_size_bytes)
2460        .set_write_batch_size(8192) // Control write granularity
2461        .build();
2462
2463    // Write to memory buffer
2464    let mut buffer = Vec::new();
2465    let mut cursor = Cursor::new(&mut buffer);
2466    let mut writer =
2467        ArrowWriter::try_new(&mut cursor, schema_ref.clone(), Some(props)).map_err(|e| {
2468            SofError::ParquetConversionError(format!("Failed to create Parquet writer: {}", e))
2469        })?;
2470
2471    // Process data in batches to handle large datasets efficiently
2472    let mut row_offset = 0;
2473    while row_offset < result.rows.len() {
2474        let batch_end = (row_offset + optimal_batch_size).min(result.rows.len());
2475        let batch_rows = &result.rows[row_offset..batch_end];
2476
2477        // Convert batch to Arrow arrays
2478        let batch_arrays =
2479            parquet_schema::process_to_arrow_arrays(&schema, &result.columns, batch_rows)?;
2480
2481        // Create RecordBatch for this chunk
2482        let batch = RecordBatch::try_new(schema_ref.clone(), batch_arrays).map_err(|e| {
2483            SofError::ParquetConversionError(format!(
2484                "Failed to create RecordBatch for rows {}-{}: {}",
2485                row_offset, batch_end, e
2486            ))
2487        })?;
2488
2489        // Write batch
2490        writer.write(&batch).map_err(|e| {
2491            SofError::ParquetConversionError(format!(
2492                "Failed to write RecordBatch for rows {}-{}: {}",
2493                row_offset, batch_end, e
2494            ))
2495        })?;
2496
2497        row_offset = batch_end;
2498    }
2499
2500    writer.close().map_err(|e| {
2501        SofError::ParquetConversionError(format!("Failed to close Parquet writer: {}", e))
2502    })?;
2503
2504    Ok(buffer)
2505}
2506
2507/// Format Parquet data with automatic file splitting when size exceeds limit
2508pub fn format_parquet_multi_file(
2509    result: ProcessedResult,
2510    options: Option<&ParquetOptions>,
2511    max_file_size_bytes: usize,
2512) -> Result<Vec<Vec<u8>>, SofError> {
2513    use arrow::record_batch::RecordBatch;
2514    use parquet::arrow::ArrowWriter;
2515    use parquet::basic::Compression;
2516    use parquet::file::properties::WriterProperties;
2517    use std::io::Cursor;
2518
2519    // Create Arrow schema from columns and sample data
2520    let schema = parquet_schema::create_arrow_schema(&result.columns, &result.rows)?;
2521    let schema_ref = std::sync::Arc::new(schema.clone());
2522
2523    // Get configuration from options or use defaults
2524    let parquet_opts = options.cloned().unwrap_or_default();
2525
2526    // Calculate optimal batch size
2527    let target_row_group_size_bytes = (parquet_opts.row_group_size_mb as usize) * 1024 * 1024;
2528    let target_page_size_bytes = (parquet_opts.page_size_kb as usize) * 1024;
2529    const TARGET_ROWS_PER_BATCH: usize = 100_000;
2530    const MAX_ROWS_PER_BATCH: usize = 500_000;
2531
2532    // Estimate average row size
2533    let sample_size = std::cmp::min(100, result.rows.len());
2534    let mut estimated_row_size = 100;
2535
2536    if sample_size > 0 {
2537        let sample_json_size: usize = result.rows[..sample_size]
2538            .iter()
2539            .map(|row| {
2540                row.values
2541                    .iter()
2542                    .filter_map(|v| v.as_ref())
2543                    .map(|v| v.to_string().len())
2544                    .sum::<usize>()
2545            })
2546            .sum();
2547        estimated_row_size = (sample_json_size / sample_size).max(50);
2548    }
2549
2550    let optimal_batch_size = (target_row_group_size_bytes / estimated_row_size)
2551        .clamp(TARGET_ROWS_PER_BATCH, MAX_ROWS_PER_BATCH);
2552
2553    // Parse compression algorithm
2554    use parquet::basic::BrotliLevel;
2555    use parquet::basic::GzipLevel;
2556    use parquet::basic::ZstdLevel;
2557
2558    let compression = match parquet_opts.compression.as_str() {
2559        "none" => Compression::UNCOMPRESSED,
2560        "gzip" => Compression::GZIP(GzipLevel::default()),
2561        "lz4" => Compression::LZ4,
2562        "brotli" => Compression::BROTLI(BrotliLevel::default()),
2563        "zstd" => Compression::ZSTD(ZstdLevel::default()),
2564        _ => Compression::SNAPPY,
2565    };
2566
2567    // Set up writer properties
2568    let props = WriterProperties::builder()
2569        .set_compression(compression)
2570        .set_max_row_group_size(target_row_group_size_bytes)
2571        .set_data_page_row_count_limit(20_000)
2572        .set_data_page_size_limit(target_page_size_bytes)
2573        .set_write_batch_size(8192)
2574        .build();
2575
2576    let mut file_buffers = Vec::new();
2577    let mut current_buffer = Vec::new();
2578    let mut current_cursor = Cursor::new(&mut current_buffer);
2579    let mut current_writer =
2580        ArrowWriter::try_new(&mut current_cursor, schema_ref.clone(), Some(props.clone()))
2581            .map_err(|e| {
2582                SofError::ParquetConversionError(format!("Failed to create Parquet writer: {}", e))
2583            })?;
2584
2585    let mut row_offset = 0;
2586    let mut _current_file_rows = 0;
2587
2588    while row_offset < result.rows.len() {
2589        let batch_end = (row_offset + optimal_batch_size).min(result.rows.len());
2590        let batch_rows = &result.rows[row_offset..batch_end];
2591
2592        // Convert batch to Arrow arrays
2593        let batch_arrays =
2594            parquet_schema::process_to_arrow_arrays(&schema, &result.columns, batch_rows)?;
2595
2596        // Create RecordBatch
2597        let batch = RecordBatch::try_new(schema_ref.clone(), batch_arrays).map_err(|e| {
2598            SofError::ParquetConversionError(format!(
2599                "Failed to create RecordBatch for rows {}-{}: {}",
2600                row_offset, batch_end, e
2601            ))
2602        })?;
2603
2604        // Write batch
2605        current_writer.write(&batch).map_err(|e| {
2606            SofError::ParquetConversionError(format!(
2607                "Failed to write RecordBatch for rows {}-{}: {}",
2608                row_offset, batch_end, e
2609            ))
2610        })?;
2611
2612        _current_file_rows += batch_end - row_offset;
2613        row_offset = batch_end;
2614
2615        // Check if we should start a new file
2616        // Get actual size of current buffer by flushing the writer
2617        let current_size = current_writer.bytes_written();
2618
2619        if current_size >= max_file_size_bytes && row_offset < result.rows.len() {
2620            // Close current file
2621            current_writer.close().map_err(|e| {
2622                SofError::ParquetConversionError(format!("Failed to close Parquet writer: {}", e))
2623            })?;
2624
2625            // Save the buffer
2626            file_buffers.push(current_buffer);
2627
2628            // Start new file
2629            current_buffer = Vec::new();
2630            current_cursor = Cursor::new(&mut current_buffer);
2631            current_writer =
2632                ArrowWriter::try_new(&mut current_cursor, schema_ref.clone(), Some(props.clone()))
2633                    .map_err(|e| {
2634                        SofError::ParquetConversionError(format!(
2635                            "Failed to create new Parquet writer: {}",
2636                            e
2637                        ))
2638                    })?;
2639            _current_file_rows = 0;
2640        }
2641    }
2642
2643    // Close the final writer
2644    current_writer.close().map_err(|e| {
2645        SofError::ParquetConversionError(format!("Failed to close final Parquet writer: {}", e))
2646    })?;
2647
2648    file_buffers.push(current_buffer);
2649
2650    Ok(file_buffers)
2651}