helios_sof/
lib.rs

1//! # SQL-on-FHIR Implementation
2//!
3//! This crate provides a complete implementation of the [SQL-on-FHIR
4//! specification](https://sql-on-fhir.org/ig/latest),
5//! enabling the transformation of FHIR resources into tabular data using declarative
6//! ViewDefinitions. It supports all major FHIR versions (R4, R4B, R5, R6) through
7//! a version-agnostic abstraction layer.
8
9//!
10//! There are three consumers of this crate:
11//! - [sof_cli](../sof_cli/index.html) - A command-line interface for the SQL-on-FHIR implementation,
12//!   allowing users to execute ViewDefinition transformations on FHIR Bundle resources
13//!   and output the results in various formats.
14//! - [sof_server](../sof_server/index.html) - A stateless HTTP server implementation for the SQL-on-FHIR specification,
15//!   enabling HTTP-based access to ViewDefinition transformation capabilities.
16//! - [hfs](../hfs/index.html) - The full featured Helios FHIR Server.
17//!
18//! ## Architecture
19//!
20//! The SOF crate is organized around these key components:
21//! - **Version-agnostic enums** ([`SofViewDefinition`], [`SofBundle`]): Multi-version containers
22//! - **Processing engine** ([`run_view_definition`]): Core transformation logic
23//! - **Output formats** ([`ContentType`]): Support for CSV, JSON, NDJSON, and Parquet
24//! - **Trait abstractions** ([`ViewDefinitionTrait`], [`BundleTrait`]): Version independence
25//!
26//! ## Key Features
27//!
28//! - **Multi-version FHIR support**: Works with R4, R4B, R5, and R6 resources
29//! - **FHIRPath evaluation**: Complex path expressions for data extraction
30//! - **forEach iteration**: Supports flattening of nested FHIR structures
31//! - **unionAll operations**: Combines multiple select statements
32//! - **Collection handling**: Proper array serialization for multi-valued fields
33//! - **Output formats**: CSV (with/without headers), JSON, NDJSON, Parquet support
34//!
35//! ## Usage Example
36//!
37//! ```rust
38//! use helios_sof::{SofViewDefinition, SofBundle, ContentType, run_view_definition};
39//! use helios_fhir::FhirVersion;
40//!
41//! # #[cfg(feature = "R4")]
42//! # {
43//! // Parse a ViewDefinition and Bundle from JSON
44//! let view_definition_json = r#"{
45//!     "resourceType": "ViewDefinition",
46//!     "status": "active",
47//!     "resource": "Patient",
48//!     "select": [{
49//!         "column": [{
50//!             "name": "id",
51//!             "path": "id"
52//!         }, {
53//!             "name": "name",
54//!             "path": "name.family"
55//!         }]
56//!     }]
57//! }"#;
58//!
59//! let bundle_json = r#"{
60//!     "resourceType": "Bundle",
61//!     "type": "collection",
62//!     "entry": [{
63//!         "resource": {
64//!             "resourceType": "Patient",
65//!             "id": "example",
66//!             "name": [{
67//!                 "family": "Doe",
68//!                 "given": ["John"]
69//!             }]
70//!         }
71//!     }]
72//! }"#;
73//!
74//! let view_definition: helios_fhir::r4::ViewDefinition = serde_json::from_str(view_definition_json)?;
75//! let bundle: helios_fhir::r4::Bundle = serde_json::from_str(bundle_json)?;
76//!
77//! // Wrap in version-agnostic containers
78//! let sof_view = SofViewDefinition::R4(view_definition);
79//! let sof_bundle = SofBundle::R4(bundle);
80//!
81//! // Transform to CSV with headers
82//! let csv_output = run_view_definition(
83//!     sof_view,
84//!     sof_bundle,
85//!     ContentType::CsvWithHeader
86//! )?;
87//!
88//! // Check the CSV output
89//! let csv_string = String::from_utf8(csv_output)?;
90//! assert!(csv_string.contains("id,name"));
91//! // CSV values are quoted
92//! assert!(csv_string.contains("example") && csv_string.contains("Doe"));
93//! # }
94//! # Ok::<(), Box<dyn std::error::Error>>(())
95//! ```
96//!
97//! ## Advanced Features
98//!
99//! ### forEach Iteration
100//!
101//! ViewDefinitions can iterate over collections using `forEach` and `forEachOrNull`:
102//!
103//! ```json
104//! {
105//!   "select": [{
106//!     "forEach": "name",
107//!     "column": [{
108//!       "name": "family_name",
109//!       "path": "family"
110//!     }]
111//!   }]
112//! }
113//! ```
114//!
115//! ### Constants and Variables
116//!
117//! Define reusable values in ViewDefinitions:
118//!
119//! ```json
120//! {
121//!   "constant": [{
122//!     "name": "system",
123//!     "valueString": "http://loinc.org"
124//!   }],
125//!   "select": [{
126//!     "where": [{
127//!       "path": "code.coding.system = %system"
128//!     }]
129//!   }]
130//! }
131//! ```
132//!
133//! ### Where Clauses
134//!
135//! Filter resources using FHIRPath expressions:
136//!
137//! ```json
138//! {
139//!   "where": [{
140//!     "path": "active = true"
141//!   }, {
142//!     "path": "birthDate.exists()"
143//!   }]
144//! }
145//! ```
146//!
147//! ## Error Handling
148//!
149//! The crate provides comprehensive error handling through [`SofError`]:
150//!
151//! ```rust,no_run
152//! use helios_sof::{SofError, SofViewDefinition, SofBundle, ContentType, run_view_definition};
153//!
154//! # let view = SofViewDefinition::R4(helios_fhir::r4::ViewDefinition::default());
155//! # let bundle = SofBundle::R4(helios_fhir::r4::Bundle::default());
156//! # let content_type = ContentType::Json;
157//! match run_view_definition(view, bundle, content_type) {
158//!     Ok(output) => {
159//!         // Process successful transformation
160//!     },
161//!     Err(SofError::InvalidViewDefinition(msg)) => {
162//!         eprintln!("ViewDefinition validation failed: {}", msg);
163//!     },
164//!     Err(SofError::FhirPathError(msg)) => {
165//!         eprintln!("FHIRPath evaluation failed: {}", msg);
166//!     },
167//!     Err(e) => {
168//!         eprintln!("Other error: {}", e);
169//!     }
170//! }
171//! ```
172//! ## Feature Flags
173//!
174//! Enable support for specific FHIR versions:
175//! - `R4`: FHIR 4.0.1 support
176//! - `R4B`: FHIR 4.3.0 support
177//! - `R5`: FHIR 5.0.0 support
178//! - `R6`: FHIR 6.0.0 support
179
180pub mod data_source;
181pub mod parquet_schema;
182pub mod traits;
183
184use chrono::{DateTime, Utc};
185use helios_fhirpath::{EvaluationContext, EvaluationResult, evaluate_expression};
186use rayon::prelude::*;
187use serde::{Deserialize, Serialize};
188use std::collections::HashMap;
189use thiserror::Error;
190use traits::*;
191
192// Re-export commonly used types and traits for easier access
193pub use helios_fhir::FhirVersion;
194pub use traits::{BundleTrait, ResourceTrait, ViewDefinitionTrait};
195
196/// Multi-version ViewDefinition container supporting version-agnostic operations.
197///
198/// This enum provides a unified interface for working with ViewDefinition resources
199/// across different FHIR specification versions. It enables applications to handle
200/// multiple FHIR versions simultaneously while maintaining type safety.
201///
202/// # Supported Versions
203///
204/// - **R4**: FHIR 4.0.1 ViewDefinition (normative)
205/// - **R4B**: FHIR 4.3.0 ViewDefinition (ballot)
206/// - **R5**: FHIR 5.0.0 ViewDefinition (ballot)
207/// - **R6**: FHIR 6.0.0 ViewDefinition (draft)
208///
209/// # Examples
210///
211/// ```rust
212/// use helios_sof::{SofViewDefinition, ContentType};
213/// # #[cfg(feature = "R4")]
214/// use helios_fhir::r4::ViewDefinition;
215///
216/// # #[cfg(feature = "R4")]
217/// # {
218/// // Parse from JSON
219/// let json = r#"{
220///     "resourceType": "ViewDefinition",
221///     "resource": "Patient",
222///     "select": [{
223///         "column": [{
224///             "name": "id",
225///             "path": "id"
226///         }]
227///     }]
228/// }"#;
229///
230/// let view_def: ViewDefinition = serde_json::from_str(json)?;
231/// let sof_view = SofViewDefinition::R4(view_def);
232///
233/// // Check version
234/// assert_eq!(sof_view.version(), helios_fhir::FhirVersion::R4);
235/// # }
236/// # Ok::<(), Box<dyn std::error::Error>>(())
237/// ```
238#[derive(Debug, Clone)]
239pub enum SofViewDefinition {
240    #[cfg(feature = "R4")]
241    R4(helios_fhir::r4::ViewDefinition),
242    #[cfg(feature = "R4B")]
243    R4B(helios_fhir::r4b::ViewDefinition),
244    #[cfg(feature = "R5")]
245    R5(helios_fhir::r5::ViewDefinition),
246    #[cfg(feature = "R6")]
247    R6(helios_fhir::r6::ViewDefinition),
248}
249
250impl SofViewDefinition {
251    /// Returns the FHIR specification version of this ViewDefinition.
252    ///
253    /// This method provides version detection for multi-version applications,
254    /// enabling version-specific processing logic and compatibility checks.
255    ///
256    /// # Returns
257    ///
258    /// The `FhirVersion` enum variant corresponding to this ViewDefinition's specification.
259    ///
260    /// # Examples
261    ///
262    /// ```rust
263    /// use helios_sof::SofViewDefinition;
264    /// use helios_fhir::FhirVersion;
265    ///
266    /// # #[cfg(feature = "R5")]
267    /// # {
268    /// # let view_def = helios_fhir::r5::ViewDefinition::default();
269    /// let sof_view = SofViewDefinition::R5(view_def);
270    /// assert_eq!(sof_view.version(), helios_fhir::FhirVersion::R5);
271    /// # }
272    /// ```
273    pub fn version(&self) -> helios_fhir::FhirVersion {
274        match self {
275            #[cfg(feature = "R4")]
276            SofViewDefinition::R4(_) => helios_fhir::FhirVersion::R4,
277            #[cfg(feature = "R4B")]
278            SofViewDefinition::R4B(_) => helios_fhir::FhirVersion::R4B,
279            #[cfg(feature = "R5")]
280            SofViewDefinition::R5(_) => helios_fhir::FhirVersion::R5,
281            #[cfg(feature = "R6")]
282            SofViewDefinition::R6(_) => helios_fhir::FhirVersion::R6,
283        }
284    }
285}
286
287/// Multi-version Bundle container supporting version-agnostic operations.
288///
289/// This enum provides a unified interface for working with FHIR Bundle resources
290/// across different FHIR specification versions. Bundles contain the actual FHIR
291/// resources that will be processed by ViewDefinitions.
292///
293/// # Supported Versions
294///
295/// - **R4**: FHIR 4.0.1 Bundle (normative)
296/// - **R4B**: FHIR 4.3.0 Bundle (ballot)
297/// - **R5**: FHIR 5.0.0 Bundle (ballot)
298/// - **R6**: FHIR 6.0.0 Bundle (draft)
299///
300/// # Examples
301///
302/// ```rust
303/// use helios_sof::SofBundle;
304/// # #[cfg(feature = "R4")]
305/// use helios_fhir::r4::Bundle;
306///
307/// # #[cfg(feature = "R4")]
308/// # {
309/// // Parse from JSON
310/// let json = r#"{
311///     "resourceType": "Bundle",
312///     "type": "collection",
313///     "entry": [{
314///         "resource": {
315///             "resourceType": "Patient",
316///             "id": "example"
317///         }
318///     }]
319/// }"#;
320///
321/// let bundle: Bundle = serde_json::from_str(json)?;
322/// let sof_bundle = SofBundle::R4(bundle);
323///
324/// // Check version compatibility
325/// assert_eq!(sof_bundle.version(), helios_fhir::FhirVersion::R4);
326/// # }
327/// # Ok::<(), Box<dyn std::error::Error>>(())
328/// ```
329#[derive(Debug, Clone)]
330pub enum SofBundle {
331    #[cfg(feature = "R4")]
332    R4(helios_fhir::r4::Bundle),
333    #[cfg(feature = "R4B")]
334    R4B(helios_fhir::r4b::Bundle),
335    #[cfg(feature = "R5")]
336    R5(helios_fhir::r5::Bundle),
337    #[cfg(feature = "R6")]
338    R6(helios_fhir::r6::Bundle),
339}
340
341impl SofBundle {
342    /// Returns the FHIR specification version of this Bundle.
343    ///
344    /// This method provides version detection for multi-version applications,
345    /// ensuring that ViewDefinitions and Bundles use compatible FHIR versions.
346    ///
347    /// # Returns
348    ///
349    /// The `FhirVersion` enum variant corresponding to this Bundle's specification.
350    ///
351    /// # Examples
352    ///
353    /// ```rust
354    /// use helios_sof::SofBundle;
355    /// use helios_fhir::FhirVersion;
356    ///
357    /// # #[cfg(feature = "R4")]
358    /// # {
359    /// # let bundle = helios_fhir::r4::Bundle::default();
360    /// let sof_bundle = SofBundle::R4(bundle);
361    /// assert_eq!(sof_bundle.version(), helios_fhir::FhirVersion::R4);
362    /// # }
363    /// ```
364    pub fn version(&self) -> helios_fhir::FhirVersion {
365        match self {
366            #[cfg(feature = "R4")]
367            SofBundle::R4(_) => helios_fhir::FhirVersion::R4,
368            #[cfg(feature = "R4B")]
369            SofBundle::R4B(_) => helios_fhir::FhirVersion::R4B,
370            #[cfg(feature = "R5")]
371            SofBundle::R5(_) => helios_fhir::FhirVersion::R5,
372            #[cfg(feature = "R6")]
373            SofBundle::R6(_) => helios_fhir::FhirVersion::R6,
374        }
375    }
376}
377
378/// Multi-version CapabilityStatement container supporting version-agnostic operations.
379///
380/// This enum provides a unified interface for working with CapabilityStatement resources
381/// across different FHIR specification versions. It enables applications to handle
382/// multiple FHIR versions simultaneously while maintaining type safety.
383///
384/// # Supported Versions
385///
386/// - **R4**: FHIR 4.0.1 CapabilityStatement (normative)
387/// - **R4B**: FHIR 4.3.0 CapabilityStatement (ballot)
388/// - **R5**: FHIR 5.0.0 CapabilityStatement (ballot)
389/// - **R6**: FHIR 6.0.0 CapabilityStatement (draft)
390#[derive(Debug, Clone, Serialize, Deserialize)]
391#[serde(untagged)]
392pub enum SofCapabilityStatement {
393    #[cfg(feature = "R4")]
394    R4(helios_fhir::r4::CapabilityStatement),
395    #[cfg(feature = "R4B")]
396    R4B(helios_fhir::r4b::CapabilityStatement),
397    #[cfg(feature = "R5")]
398    R5(helios_fhir::r5::CapabilityStatement),
399    #[cfg(feature = "R6")]
400    R6(helios_fhir::r6::CapabilityStatement),
401}
402
403impl SofCapabilityStatement {
404    /// Returns the FHIR specification version of this CapabilityStatement.
405    pub fn version(&self) -> helios_fhir::FhirVersion {
406        match self {
407            #[cfg(feature = "R4")]
408            SofCapabilityStatement::R4(_) => helios_fhir::FhirVersion::R4,
409            #[cfg(feature = "R4B")]
410            SofCapabilityStatement::R4B(_) => helios_fhir::FhirVersion::R4B,
411            #[cfg(feature = "R5")]
412            SofCapabilityStatement::R5(_) => helios_fhir::FhirVersion::R5,
413            #[cfg(feature = "R6")]
414            SofCapabilityStatement::R6(_) => helios_fhir::FhirVersion::R6,
415        }
416    }
417}
418
419/// Type alias for the version-independent Parameters container.
420///
421/// This alias provides backward compatibility while using the unified
422/// VersionIndependentParameters from the helios_fhir crate.
423pub type SofParameters = helios_fhir::VersionIndependentParameters;
424
425/// Comprehensive error type for SQL-on-FHIR operations.
426///
427/// This enum covers all possible error conditions that can occur during
428/// ViewDefinition processing, from validation failures to output formatting issues.
429/// Each variant provides specific context about the error to aid in debugging.
430///
431/// # Error Categories
432///
433/// - **Validation**: ViewDefinition structure and logic validation
434/// - **Evaluation**: FHIRPath expression evaluation failures
435/// - **I/O**: File and serialization operations
436/// - **Format**: Output format conversion issues
437///
438/// # Examples
439///
440/// ```rust,no_run
441/// use helios_sof::{SofError, SofViewDefinition, SofBundle, ContentType, run_view_definition};
442///
443/// # let view = SofViewDefinition::R4(helios_fhir::r4::ViewDefinition::default());
444/// # let bundle = SofBundle::R4(helios_fhir::r4::Bundle::default());
445/// # let content_type = ContentType::Json;
446/// match run_view_definition(view, bundle, content_type) {
447///     Ok(output) => {
448///         println!("Transformation successful");
449///     },
450///     Err(SofError::InvalidViewDefinition(msg)) => {
451///         eprintln!("ViewDefinition validation failed: {}", msg);
452///     },
453///     Err(SofError::FhirPathError(msg)) => {
454///         eprintln!("FHIRPath evaluation error: {}", msg);
455///     },
456///     Err(SofError::UnsupportedContentType(format)) => {
457///         eprintln!("Unsupported output format: {}", format);
458///     },
459///     Err(e) => {
460///         eprintln!("Other error: {}", e);
461///     }
462/// }
463/// ```
464#[derive(Debug, Error)]
465pub enum SofError {
466    /// ViewDefinition structure or logic validation failed.
467    ///
468    /// This error occurs when a ViewDefinition contains invalid or inconsistent
469    /// configuration, such as missing required fields, invalid FHIRPath expressions,
470    /// or incompatible select/unionAll structures.
471    #[error("Invalid ViewDefinition: {0}")]
472    InvalidViewDefinition(String),
473
474    /// FHIRPath expression evaluation failed.
475    ///
476    /// This error occurs when a FHIRPath expression in a ViewDefinition cannot
477    /// be evaluated, either due to syntax errors or runtime evaluation issues.
478    #[error("FHIRPath evaluation error: {0}")]
479    FhirPathError(String),
480
481    /// JSON serialization/deserialization failed.
482    ///
483    /// This error occurs when parsing input JSON or serializing output data fails,
484    /// typically due to malformed JSON or incompatible data structures.
485    #[error("Serialization error: {0}")]
486    SerializationError(#[from] serde_json::Error),
487
488    /// CSV processing failed.
489    ///
490    /// This error occurs during CSV output generation, such as when writing
491    /// headers or data rows to the CSV format.
492    #[error("CSV error: {0}")]
493    CsvError(#[from] csv::Error),
494
495    /// File I/O operation failed.
496    ///
497    /// This error occurs when reading input files or writing output files fails,
498    /// typically due to permission issues or missing files.
499    #[error("IO error: {0}")]
500    IoError(#[from] std::io::Error),
501
502    /// Unsupported output content type requested.
503    ///
504    /// This error occurs when an invalid or unimplemented content type is
505    /// specified for output formatting.
506    #[error("Unsupported content type: {0}")]
507    UnsupportedContentType(String),
508
509    /// CSV writer internal error.
510    ///
511    /// This error occurs when the CSV writer encounters an internal issue
512    /// that prevents successful output generation.
513    #[error("CSV writer error: {0}")]
514    CsvWriterError(String),
515
516    /// Invalid source parameter value.
517    ///
518    /// This error occurs when the source parameter contains an invalid URL or path.
519    #[error("Invalid source: {0}")]
520    InvalidSource(String),
521
522    /// Source not found.
523    ///
524    /// This error occurs when the specified source file or URL cannot be found.
525    #[error("Source not found: {0}")]
526    SourceNotFound(String),
527
528    /// Failed to fetch data from source.
529    ///
530    /// This error occurs when fetching data from a remote source fails.
531    #[error("Failed to fetch source: {0}")]
532    SourceFetchError(String),
533
534    /// Failed to read source data.
535    ///
536    /// This error occurs when reading data from the source fails.
537    #[error("Failed to read source: {0}")]
538    SourceReadError(String),
539
540    /// Invalid content in source.
541    ///
542    /// This error occurs when the source content is not valid FHIR data.
543    #[error("Invalid source content: {0}")]
544    InvalidSourceContent(String),
545
546    /// Unsupported source protocol.
547    ///
548    /// This error occurs when the source URL uses an unsupported protocol.
549    #[error("Unsupported source protocol: {0}")]
550    UnsupportedSourceProtocol(String),
551
552    /// Parquet conversion error.
553    ///
554    /// This error occurs when converting data to Parquet format fails.
555    #[error("Parquet conversion error: {0}")]
556    ParquetConversionError(String),
557}
558
559/// Supported output content types for ViewDefinition transformations.
560///
561/// This enum defines the available output formats for transformed FHIR data.
562/// Each format has specific characteristics and use cases for different
563/// integration scenarios.
564///
565/// # Format Descriptions
566///
567/// - **CSV**: Comma-separated values without headers
568/// - **CSV with Headers**: Comma-separated values with column headers
569/// - **JSON**: Pretty-printed JSON array of objects
570/// - **NDJSON**: Newline-delimited JSON (one object per line)
571/// - **Parquet**: Apache Parquet columnar format (planned)
572///
573/// # Examples
574///
575/// ```rust
576/// use helios_sof::ContentType;
577///
578/// // Parse from string
579/// let csv_type = ContentType::from_string("text/csv")?;
580/// assert_eq!(csv_type, ContentType::CsvWithHeader);  // Default includes headers
581///
582/// let json_type = ContentType::from_string("application/json")?;
583/// assert_eq!(json_type, ContentType::Json);
584///
585/// // CSV without headers
586/// let csv_no_headers = ContentType::from_string("text/csv;header=false")?;
587/// assert_eq!(csv_no_headers, ContentType::Csv);
588/// # Ok::<(), helios_sof::SofError>(())
589/// ```
590#[derive(Debug, Clone, Copy, PartialEq, Eq)]
591pub enum ContentType {
592    /// Comma-separated values format without headers
593    Csv,
594    /// Comma-separated values format with column headers
595    CsvWithHeader,
596    /// Pretty-printed JSON array format
597    Json,
598    /// Newline-delimited JSON format (NDJSON)
599    NdJson,
600    /// Apache Parquet columnar format (not yet implemented)
601    Parquet,
602}
603
604impl ContentType {
605    /// Parse a content type from its MIME type string representation.
606    ///
607    /// This method converts standard MIME type strings to the corresponding
608    /// ContentType enum variants. It supports the SQL-on-FHIR specification's
609    /// recommended content types.
610    ///
611    /// # Supported MIME Types
612    ///
613    /// - `"text/csv"` → [`ContentType::Csv`]
614    /// - `"text/csv"` → [`ContentType::CsvWithHeader`] (default: headers included)
615    /// - `"text/csv;header=true"` → [`ContentType::CsvWithHeader`]
616    /// - `"text/csv;header=false"` → [`ContentType::Csv`]
617    /// - `"application/json"` → [`ContentType::Json`]
618    /// - `"application/ndjson"` → [`ContentType::NdJson`]
619    /// - `"application/x-ndjson"` → [`ContentType::NdJson`]
620    /// - `"application/parquet"` → [`ContentType::Parquet`]
621    ///
622    /// # Arguments
623    ///
624    /// * `s` - The MIME type string to parse
625    ///
626    /// # Returns
627    ///
628    /// * `Ok(ContentType)` - Successfully parsed content type
629    /// * `Err(SofError::UnsupportedContentType)` - Unknown or unsupported MIME type
630    ///
631    /// # Examples
632    ///
633    /// ```rust
634    /// use helios_sof::ContentType;
635    ///
636    /// // Shortened format names
637    /// let csv = ContentType::from_string("csv")?;
638    /// assert_eq!(csv, ContentType::CsvWithHeader);
639    ///
640    /// let json = ContentType::from_string("json")?;
641    /// assert_eq!(json, ContentType::Json);
642    ///
643    /// let ndjson = ContentType::from_string("ndjson")?;
644    /// assert_eq!(ndjson, ContentType::NdJson);
645    ///
646    /// // Full MIME types still supported
647    /// let csv_mime = ContentType::from_string("text/csv")?;
648    /// assert_eq!(csv_mime, ContentType::CsvWithHeader);
649    ///
650    /// // CSV with headers explicitly
651    /// let csv_headers = ContentType::from_string("text/csv;header=true")?;
652    /// assert_eq!(csv_headers, ContentType::CsvWithHeader);
653    ///
654    /// // CSV without headers
655    /// let csv_no_headers = ContentType::from_string("text/csv;header=false")?;
656    /// assert_eq!(csv_no_headers, ContentType::Csv);
657    ///
658    /// // JSON format
659    /// let json_mime = ContentType::from_string("application/json")?;
660    /// assert_eq!(json_mime, ContentType::Json);
661    ///
662    /// // Error for unsupported type
663    /// assert!(ContentType::from_string("text/plain").is_err());
664    /// # Ok::<(), helios_sof::SofError>(())
665    /// ```
666    pub fn from_string(s: &str) -> Result<Self, SofError> {
667        match s {
668            // Shortened format names
669            "csv" => Ok(ContentType::CsvWithHeader),
670            "json" => Ok(ContentType::Json),
671            "ndjson" => Ok(ContentType::NdJson),
672            "parquet" => Ok(ContentType::Parquet),
673            // Full MIME types (for Accept header compatibility)
674            "text/csv;header=false" => Ok(ContentType::Csv),
675            "text/csv" | "text/csv;header=true" => Ok(ContentType::CsvWithHeader),
676            "application/json" => Ok(ContentType::Json),
677            "application/ndjson" | "application/x-ndjson" => Ok(ContentType::NdJson),
678            "application/parquet" => Ok(ContentType::Parquet),
679            _ => Err(SofError::UnsupportedContentType(s.to_string())),
680        }
681    }
682}
683
684/// Returns the FHIR version string for the newest enabled version.
685///
686/// This function provides the version string that should be used in CapabilityStatements
687/// and other FHIR resources that need to specify their version.
688pub fn get_fhir_version_string() -> &'static str {
689    let newest_version = get_newest_enabled_fhir_version();
690
691    match newest_version {
692        #[cfg(feature = "R4")]
693        helios_fhir::FhirVersion::R4 => "4.0.1",
694        #[cfg(feature = "R4B")]
695        helios_fhir::FhirVersion::R4B => "4.3.0",
696        #[cfg(feature = "R5")]
697        helios_fhir::FhirVersion::R5 => "5.0.0",
698        #[cfg(feature = "R6")]
699        helios_fhir::FhirVersion::R6 => "6.0.0",
700    }
701}
702
703/// Returns the newest FHIR version that is enabled at compile time.
704///
705/// This function uses compile-time feature detection to determine which FHIR
706/// version should be used when multiple versions are enabled. The priority order
707/// is: R6 > R5 > R4B > R4, where newer versions take precedence.
708///
709/// # Examples
710///
711/// ```rust
712/// use helios_sof::{get_newest_enabled_fhir_version, FhirVersion};
713///
714/// # #[cfg(any(feature = "R4", feature = "R4B", feature = "R5", feature = "R6"))]
715/// # {
716/// let version = get_newest_enabled_fhir_version();
717/// // If R5 and R4 are both enabled, this returns R5
718/// # }
719/// ```
720///
721/// # Panics
722///
723/// This function will panic at compile time if no FHIR version features are enabled.
724pub fn get_newest_enabled_fhir_version() -> helios_fhir::FhirVersion {
725    #[cfg(feature = "R6")]
726    return helios_fhir::FhirVersion::R6;
727
728    #[cfg(all(feature = "R5", not(feature = "R6")))]
729    return helios_fhir::FhirVersion::R5;
730
731    #[cfg(all(feature = "R4B", not(feature = "R5"), not(feature = "R6")))]
732    return helios_fhir::FhirVersion::R4B;
733
734    #[cfg(all(
735        feature = "R4",
736        not(feature = "R4B"),
737        not(feature = "R5"),
738        not(feature = "R6")
739    ))]
740    return helios_fhir::FhirVersion::R4;
741
742    #[cfg(not(any(feature = "R4", feature = "R4B", feature = "R5", feature = "R6")))]
743    panic!("At least one FHIR version feature must be enabled");
744}
745
746/// A single row of processed tabular data from ViewDefinition transformation.
747///
748/// This struct represents one row in the output table, containing values for
749/// each column defined in the ViewDefinition. Values are stored as optional
750/// JSON values to handle nullable fields and diverse FHIR data types.
751///
752/// # Structure
753///
754/// Each `ProcessedRow` contains a vector of optional JSON values, where:
755/// - `Some(value)` represents a non-null column value
756/// - `None` represents a null/missing column value
757/// - The order matches the column order in [`ProcessedResult::columns`]
758///
759/// # Examples
760///
761/// ```rust
762/// use helios_sof::ProcessedRow;
763/// use serde_json::Value;
764///
765/// let row = ProcessedRow {
766///     values: vec![
767///         Some(Value::String("patient-123".to_string())),
768///         Some(Value::String("Doe".to_string())),
769///         None, // Missing birth date
770///         Some(Value::Bool(true)),
771///     ]
772/// };
773/// ```
774#[derive(Debug, Clone, Serialize, Deserialize)]
775pub struct ProcessedRow {
776    /// Column values for this row, ordered according to ProcessedResult::columns
777    pub values: Vec<Option<serde_json::Value>>,
778}
779
780/// Complete result of ViewDefinition transformation containing columns and data rows.
781///
782/// This struct represents the tabular output from processing a ViewDefinition
783/// against a Bundle of FHIR resources. It contains both the column definitions
784/// and the actual data rows in a format ready for serialization to various
785/// output formats.
786///
787/// # Structure
788///
789/// - [`columns`](Self::columns): Ordered list of column names from the ViewDefinition
790/// - [`rows`](Self::rows): Data rows where each row contains values in column order
791///
792/// # Examples
793///
794/// ```rust
795/// use helios_sof::{ProcessedResult, ProcessedRow};
796/// use serde_json::Value;
797///
798/// let result = ProcessedResult {
799///     columns: vec![
800///         "patient_id".to_string(),
801///         "family_name".to_string(),
802///         "given_name".to_string(),
803///     ],
804///     rows: vec![
805///         ProcessedRow {
806///             values: vec![
807///                 Some(Value::String("patient-1".to_string())),
808///                 Some(Value::String("Smith".to_string())),
809///                 Some(Value::String("John".to_string())),
810///             ]
811///         },
812///         ProcessedRow {
813///             values: vec![
814///                 Some(Value::String("patient-2".to_string())),
815///                 Some(Value::String("Doe".to_string())),
816///                 None, // Missing given name
817///             ]
818///         },
819///     ]
820/// };
821///
822/// assert_eq!(result.columns.len(), 3);
823/// assert_eq!(result.rows.len(), 2);
824/// ```
825#[derive(Debug, Clone, Serialize, Deserialize)]
826pub struct ProcessedResult {
827    /// Ordered list of column names as defined in the ViewDefinition
828    pub columns: Vec<String>,
829    /// Data rows containing values for each column
830    pub rows: Vec<ProcessedRow>,
831}
832
833/// Execute a SQL-on-FHIR ViewDefinition transformation on a FHIR Bundle.
834///
835/// This is the main entry point for SQL-on-FHIR transformations. It processes
836/// a ViewDefinition against a Bundle of FHIR resources and produces output in
837/// the specified format. The function handles version compatibility, validation,
838/// FHIRPath evaluation, and output formatting.
839///
840/// # Arguments
841///
842/// * `view_definition` - The ViewDefinition containing transformation logic
843/// * `bundle` - The Bundle containing FHIR resources to process
844/// * `content_type` - The desired output format
845///
846/// # Returns
847///
848/// * `Ok(Vec<u8>)` - Formatted output bytes ready for writing to file or stdout
849/// * `Err(SofError)` - Detailed error information about what went wrong
850///
851/// # Validation
852///
853/// The function performs comprehensive validation:
854/// - FHIR version compatibility between ViewDefinition and Bundle
855/// - ViewDefinition structure and logic validation
856/// - FHIRPath expression syntax and evaluation
857/// - Output format compatibility
858///
859/// # Examples
860///
861/// ```rust
862/// use helios_sof::{SofViewDefinition, SofBundle, ContentType, run_view_definition};
863///
864/// # #[cfg(feature = "R4")]
865/// # {
866/// // Create a simple ViewDefinition
867/// let view_json = serde_json::json!({
868///     "resourceType": "ViewDefinition",
869///     "status": "active",
870///     "resource": "Patient",
871///     "select": [{
872///         "column": [{
873///             "name": "id",
874///             "path": "id"
875///         }]
876///     }]
877/// });
878/// let view_def: helios_fhir::r4::ViewDefinition = serde_json::from_value(view_json)?;
879///
880/// // Create a simple Bundle
881/// let bundle_json = serde_json::json!({
882///     "resourceType": "Bundle",
883///     "type": "collection",
884///     "entry": []
885/// });
886/// let bundle: helios_fhir::r4::Bundle = serde_json::from_value(bundle_json)?;
887///
888/// let sof_view = SofViewDefinition::R4(view_def);
889/// let sof_bundle = SofBundle::R4(bundle);
890///
891/// // Generate CSV with headers
892/// let csv_output = run_view_definition(
893///     sof_view,
894///     sof_bundle,
895///     ContentType::CsvWithHeader
896/// )?;
897///
898/// // Write to file or stdout
899/// std::fs::write("output.csv", csv_output)?;
900/// # }
901/// # Ok::<(), Box<dyn std::error::Error>>(())
902/// ```
903///
904/// # Error Handling
905///
906/// Common error scenarios:
907///
908/// ```rust,no_run
909/// use helios_sof::{SofError, SofViewDefinition, SofBundle, ContentType, run_view_definition};
910///
911/// # let view = SofViewDefinition::R4(helios_fhir::r4::ViewDefinition::default());
912/// # let bundle = SofBundle::R4(helios_fhir::r4::Bundle::default());
913/// # let content_type = ContentType::Json;
914/// match run_view_definition(view, bundle, content_type) {
915///     Ok(output) => {
916///         println!("Success: {} bytes generated", output.len());
917///     },
918///     Err(SofError::InvalidViewDefinition(msg)) => {
919///         eprintln!("ViewDefinition error: {}", msg);
920///     },
921///     Err(SofError::FhirPathError(msg)) => {
922///         eprintln!("FHIRPath error: {}", msg);
923///     },
924///     Err(e) => {
925///         eprintln!("Other error: {}", e);
926///     }
927/// }
928/// ```
929pub fn run_view_definition(
930    view_definition: SofViewDefinition,
931    bundle: SofBundle,
932    content_type: ContentType,
933) -> Result<Vec<u8>, SofError> {
934    run_view_definition_with_options(view_definition, bundle, content_type, RunOptions::default())
935}
936
937/// Configuration options for Parquet file generation.
938#[derive(Debug, Clone)]
939pub struct ParquetOptions {
940    /// Target row group size in MB (64-1024)
941    pub row_group_size_mb: u32,
942    /// Target page size in KB (64-8192)
943    pub page_size_kb: u32,
944    /// Compression algorithm (none, snappy, gzip, lz4, brotli, zstd)
945    pub compression: String,
946    /// Maximum file size in MB (splits output when exceeded)
947    pub max_file_size_mb: Option<u32>,
948}
949
950impl Default for ParquetOptions {
951    fn default() -> Self {
952        Self {
953            row_group_size_mb: 256,
954            page_size_kb: 1024,
955            compression: "snappy".to_string(),
956            max_file_size_mb: None,
957        }
958    }
959}
960
961/// Options for filtering and controlling ViewDefinition execution
962#[derive(Debug, Clone, Default)]
963pub struct RunOptions {
964    /// Filter resources modified after this time
965    pub since: Option<DateTime<Utc>>,
966    /// Limit the number of results
967    pub limit: Option<usize>,
968    /// Page number for pagination (1-based)
969    pub page: Option<usize>,
970    /// Parquet-specific configuration options
971    pub parquet_options: Option<ParquetOptions>,
972}
973
974/// Execute a ViewDefinition transformation with additional filtering options.
975///
976/// This function extends the basic `run_view_definition` with support for:
977/// - Filtering resources by modification time (`since`)
978/// - Limiting results (`limit`)
979/// - Pagination (`page`)
980///
981/// # Arguments
982///
983/// * `view_definition` - The ViewDefinition to execute
984/// * `bundle` - The Bundle containing resources to transform
985/// * `content_type` - Desired output format
986/// * `options` - Additional filtering and control options
987///
988/// # Returns
989///
990/// The transformed data in the requested format, with filtering applied.
991pub fn run_view_definition_with_options(
992    view_definition: SofViewDefinition,
993    bundle: SofBundle,
994    content_type: ContentType,
995    options: RunOptions,
996) -> Result<Vec<u8>, SofError> {
997    // Filter bundle resources by since parameter before processing
998    let filtered_bundle = if let Some(since) = options.since {
999        filter_bundle_by_since(bundle, since)?
1000    } else {
1001        bundle
1002    };
1003
1004    // Process the ViewDefinition to generate tabular data
1005    let processed_result = process_view_definition(view_definition, filtered_bundle)?;
1006
1007    // Apply pagination if needed
1008    let processed_result = if options.limit.is_some() || options.page.is_some() {
1009        apply_pagination_to_result(processed_result, options.limit, options.page)?
1010    } else {
1011        processed_result
1012    };
1013
1014    // Format the result according to the requested content type
1015    format_output(
1016        processed_result,
1017        content_type,
1018        options.parquet_options.as_ref(),
1019    )
1020}
1021
1022pub fn process_view_definition(
1023    view_definition: SofViewDefinition,
1024    bundle: SofBundle,
1025) -> Result<ProcessedResult, SofError> {
1026    // Ensure both resources use the same FHIR version
1027    if view_definition.version() != bundle.version() {
1028        return Err(SofError::InvalidViewDefinition(
1029            "ViewDefinition and Bundle must use the same FHIR version".to_string(),
1030        ));
1031    }
1032
1033    match (view_definition, bundle) {
1034        #[cfg(feature = "R4")]
1035        (SofViewDefinition::R4(vd), SofBundle::R4(bundle)) => {
1036            process_view_definition_generic(vd, bundle)
1037        }
1038        #[cfg(feature = "R4B")]
1039        (SofViewDefinition::R4B(vd), SofBundle::R4B(bundle)) => {
1040            process_view_definition_generic(vd, bundle)
1041        }
1042        #[cfg(feature = "R5")]
1043        (SofViewDefinition::R5(vd), SofBundle::R5(bundle)) => {
1044            process_view_definition_generic(vd, bundle)
1045        }
1046        #[cfg(feature = "R6")]
1047        (SofViewDefinition::R6(vd), SofBundle::R6(bundle)) => {
1048            process_view_definition_generic(vd, bundle)
1049        }
1050        // This case should never happen due to the version check above,
1051        // but is needed for exhaustive pattern matching when multiple features are enabled
1052        #[cfg(any(
1053            all(feature = "R4", any(feature = "R4B", feature = "R5", feature = "R6")),
1054            all(feature = "R4B", any(feature = "R5", feature = "R6")),
1055            all(feature = "R5", feature = "R6")
1056        ))]
1057        _ => {
1058            unreachable!("Version mismatch should have been caught by the version check above")
1059        }
1060    }
1061}
1062
1063// Generic version-agnostic constant extraction
1064fn extract_view_definition_constants<VD: ViewDefinitionTrait>(
1065    view_definition: &VD,
1066) -> Result<HashMap<String, EvaluationResult>, SofError> {
1067    let mut variables = HashMap::new();
1068
1069    if let Some(constants) = view_definition.constants() {
1070        for constant in constants {
1071            let name = constant
1072                .name()
1073                .ok_or_else(|| {
1074                    SofError::InvalidViewDefinition("Constant name is required".to_string())
1075                })?
1076                .to_string();
1077
1078            let eval_result = constant.to_evaluation_result()?;
1079            // Constants are referenced with % prefix in FHIRPath expressions
1080            variables.insert(format!("%{}", name), eval_result);
1081        }
1082    }
1083
1084    Ok(variables)
1085}
1086
1087// Generic version-agnostic ViewDefinition processing
1088fn process_view_definition_generic<VD, B>(
1089    view_definition: VD,
1090    bundle: B,
1091) -> Result<ProcessedResult, SofError>
1092where
1093    VD: ViewDefinitionTrait,
1094    B: BundleTrait,
1095    B::Resource: ResourceTrait + Sync,
1096    VD::Select: Sync,
1097{
1098    validate_view_definition(&view_definition)?;
1099
1100    // Step 1: Extract constants/variables from ViewDefinition
1101    let variables = extract_view_definition_constants(&view_definition)?;
1102
1103    // Step 2: Filter resources by type and profile
1104    let target_resource_type = view_definition
1105        .resource()
1106        .ok_or_else(|| SofError::InvalidViewDefinition("Resource type is required".to_string()))?;
1107
1108    let filtered_resources = filter_resources(&bundle, target_resource_type)?;
1109
1110    // Step 3: Apply where clauses to filter resources
1111    let filtered_resources = apply_where_clauses(
1112        filtered_resources,
1113        view_definition.where_clauses(),
1114        &variables,
1115    )?;
1116
1117    // Step 4: Process all select clauses to generate rows with forEach support
1118    let select_clauses = view_definition.select().ok_or_else(|| {
1119        SofError::InvalidViewDefinition("At least one select clause is required".to_string())
1120    })?;
1121
1122    // Generate rows for each resource using the forEach-aware approach
1123    let (all_columns, rows) =
1124        generate_rows_from_selects(&filtered_resources, select_clauses, &variables)?;
1125
1126    Ok(ProcessedResult {
1127        columns: all_columns,
1128        rows,
1129    })
1130}
1131
1132// Generic version-agnostic validation
1133fn validate_view_definition<VD: ViewDefinitionTrait>(view_def: &VD) -> Result<(), SofError> {
1134    // Basic validation
1135    if view_def.resource().is_none_or(|s| s.is_empty()) {
1136        return Err(SofError::InvalidViewDefinition(
1137            "ViewDefinition must specify a resource type".to_string(),
1138        ));
1139    }
1140
1141    if view_def.select().is_none_or(|s| s.is_empty()) {
1142        return Err(SofError::InvalidViewDefinition(
1143            "ViewDefinition must have at least one select".to_string(),
1144        ));
1145    }
1146
1147    // Validate where clauses
1148    if let Some(where_clauses) = view_def.where_clauses() {
1149        validate_where_clauses(where_clauses)?;
1150    }
1151
1152    // Validate selects
1153    if let Some(selects) = view_def.select() {
1154        for select in selects {
1155            validate_select(select)?;
1156        }
1157    }
1158
1159    Ok(())
1160}
1161
1162// Generic where clause validation
1163fn validate_where_clauses<W: ViewDefinitionWhereTrait>(
1164    where_clauses: &[W],
1165) -> Result<(), SofError> {
1166    // Basic validation - just ensure paths are provided
1167    // Type checking will be done during actual evaluation
1168    for where_clause in where_clauses {
1169        if where_clause.path().is_none() {
1170            return Err(SofError::InvalidViewDefinition(
1171                "Where clause must have a path specified".to_string(),
1172            ));
1173        }
1174    }
1175    Ok(())
1176}
1177
1178// Generic helper - no longer needs to be version-specific
1179fn can_be_coerced_to_boolean(result: &EvaluationResult) -> bool {
1180    // Check if the result can be meaningfully used as a boolean in a where clause
1181    match result {
1182        // Boolean values are obviously OK
1183        EvaluationResult::Boolean(_, _) => true,
1184
1185        // Empty is OK (evaluates to false)
1186        EvaluationResult::Empty => true,
1187
1188        // Collections are OK - they evaluate based on whether they're empty or not
1189        EvaluationResult::Collection { .. } => true,
1190
1191        // Other types cannot be meaningfully coerced to boolean for where clauses
1192        // This includes: String, Integer, Decimal, Date, DateTime, Time, Quantity, Object
1193        _ => false,
1194    }
1195}
1196
1197// Generic select validation
1198fn validate_select<S: ViewDefinitionSelectTrait>(select: &S) -> Result<(), SofError> {
1199    validate_select_with_context(select, false)
1200}
1201
1202fn validate_select_with_context<S: ViewDefinitionSelectTrait>(
1203    select: &S,
1204    in_foreach_context: bool,
1205) -> Result<(), SofError>
1206where
1207    S::Select: ViewDefinitionSelectTrait,
1208{
1209    // Determine if we're entering a forEach context at this level
1210    let entering_foreach = select.for_each().is_some() || select.for_each_or_null().is_some();
1211    let current_foreach_context = in_foreach_context || entering_foreach;
1212
1213    // Validate collection attribute with the current forEach context
1214    if let Some(columns) = select.column() {
1215        for column in columns {
1216            if let Some(collection_value) = column.collection() {
1217                if !collection_value && !current_foreach_context {
1218                    return Err(SofError::InvalidViewDefinition(
1219                        "Column 'collection' attribute must be true when specified".to_string(),
1220                    ));
1221                }
1222            }
1223        }
1224    }
1225
1226    // Validate unionAll column consistency
1227    if let Some(union_selects) = select.union_all() {
1228        validate_union_all_columns(union_selects)?;
1229    }
1230
1231    // Recursively validate nested selects
1232    if let Some(nested_selects) = select.select() {
1233        for nested_select in nested_selects {
1234            validate_select_with_context(nested_select, current_foreach_context)?;
1235        }
1236    }
1237
1238    // Validate unionAll selects with forEach context
1239    if let Some(union_selects) = select.union_all() {
1240        for union_select in union_selects {
1241            validate_select_with_context(union_select, current_foreach_context)?;
1242        }
1243    }
1244
1245    Ok(())
1246}
1247
1248// Generic union validation
1249fn validate_union_all_columns<S: ViewDefinitionSelectTrait>(
1250    union_selects: &[S],
1251) -> Result<(), SofError> {
1252    if union_selects.len() < 2 {
1253        return Ok(());
1254    }
1255
1256    // Get column names and order from first select
1257    let first_select = &union_selects[0];
1258    let first_columns = get_column_names(first_select)?;
1259
1260    // Validate all other selects have the same column names in the same order
1261    for (index, union_select) in union_selects.iter().enumerate().skip(1) {
1262        let current_columns = get_column_names(union_select)?;
1263
1264        if current_columns != first_columns {
1265            if current_columns.len() != first_columns.len()
1266                || !current_columns
1267                    .iter()
1268                    .all(|name| first_columns.contains(name))
1269            {
1270                return Err(SofError::InvalidViewDefinition(format!(
1271                    "UnionAll branch {} has different column names than first branch",
1272                    index
1273                )));
1274            } else {
1275                return Err(SofError::InvalidViewDefinition(format!(
1276                    "UnionAll branch {} has columns in different order than first branch",
1277                    index
1278                )));
1279            }
1280        }
1281    }
1282
1283    Ok(())
1284}
1285
1286// Generic column name extraction
1287fn get_column_names<S: ViewDefinitionSelectTrait>(select: &S) -> Result<Vec<String>, SofError> {
1288    let mut column_names = Vec::new();
1289
1290    // Collect direct column names
1291    if let Some(columns) = select.column() {
1292        for column in columns {
1293            if let Some(name) = column.name() {
1294                column_names.push(name.to_string());
1295            }
1296        }
1297    }
1298
1299    // If this select has unionAll but no direct columns, get columns from first unionAll branch
1300    if column_names.is_empty() {
1301        if let Some(union_selects) = select.union_all() {
1302            if !union_selects.is_empty() {
1303                return get_column_names(&union_selects[0]);
1304            }
1305        }
1306    }
1307
1308    Ok(column_names)
1309}
1310
1311// Generic resource filtering
1312fn filter_resources<'a, B: BundleTrait>(
1313    bundle: &'a B,
1314    resource_type: &str,
1315) -> Result<Vec<&'a B::Resource>, SofError> {
1316    Ok(bundle
1317        .entries()
1318        .into_iter()
1319        .filter(|resource| resource.resource_name() == resource_type)
1320        .collect())
1321}
1322
1323// Generic where clause application
1324fn apply_where_clauses<'a, R, W>(
1325    resources: Vec<&'a R>,
1326    where_clauses: Option<&[W]>,
1327    variables: &HashMap<String, EvaluationResult>,
1328) -> Result<Vec<&'a R>, SofError>
1329where
1330    R: ResourceTrait,
1331    W: ViewDefinitionWhereTrait,
1332{
1333    if let Some(wheres) = where_clauses {
1334        let mut filtered = Vec::new();
1335
1336        for resource in resources {
1337            let mut include_resource = true;
1338
1339            // All where clauses must evaluate to true for the resource to be included
1340            for where_clause in wheres {
1341                let fhir_resource = resource.to_fhir_resource();
1342                let mut context = EvaluationContext::new(vec![fhir_resource]);
1343
1344                // Add variables to the context
1345                for (name, value) in variables {
1346                    context.set_variable_result(name, value.clone());
1347                }
1348
1349                let path = where_clause.path().ok_or_else(|| {
1350                    SofError::InvalidViewDefinition("Where clause path is required".to_string())
1351                })?;
1352
1353                match evaluate_expression(path, &context) {
1354                    Ok(result) => {
1355                        // Check if the result can be meaningfully used as a boolean
1356                        if !can_be_coerced_to_boolean(&result) {
1357                            return Err(SofError::InvalidViewDefinition(format!(
1358                                "Where clause path '{}' returns type '{}' which cannot be used as a boolean condition. \
1359                                 Where clauses must return boolean values, collections, or empty results.",
1360                                path,
1361                                result.type_name()
1362                            )));
1363                        }
1364
1365                        // Check if result is truthy (non-empty and not false)
1366                        if !is_truthy(&result) {
1367                            include_resource = false;
1368                            break;
1369                        }
1370                    }
1371                    Err(e) => {
1372                        return Err(SofError::FhirPathError(format!(
1373                            "Error evaluating where clause '{}': {}",
1374                            path, e
1375                        )));
1376                    }
1377                }
1378            }
1379
1380            if include_resource {
1381                filtered.push(resource);
1382            }
1383        }
1384
1385        Ok(filtered)
1386    } else {
1387        Ok(resources)
1388    }
1389}
1390
1391// Removed generate_rows_per_resource_r4 - replaced with new forEach-aware implementation
1392
1393// Removed generate_rows_with_for_each_r4 - replaced with new forEach-aware implementation
1394
1395// Helper functions for FHIRPath result processing
1396fn is_truthy(result: &EvaluationResult) -> bool {
1397    match result {
1398        EvaluationResult::Empty => false,
1399        EvaluationResult::Boolean(b, _) => *b,
1400        EvaluationResult::Collection { items, .. } => !items.is_empty(),
1401        _ => true, // Non-empty, non-false values are truthy
1402    }
1403}
1404
1405fn fhirpath_result_to_json_value_collection(result: EvaluationResult) -> Option<serde_json::Value> {
1406    match result {
1407        EvaluationResult::Empty => Some(serde_json::Value::Array(vec![])),
1408        EvaluationResult::Collection { items, .. } => {
1409            // Always return array for collection columns, even if empty
1410            let values: Vec<serde_json::Value> = items
1411                .into_iter()
1412                .filter_map(fhirpath_result_to_json_value)
1413                .collect();
1414            Some(serde_json::Value::Array(values))
1415        }
1416        // For non-collection results in collection columns, wrap in array
1417        single_result => {
1418            if let Some(json_val) = fhirpath_result_to_json_value(single_result) {
1419                Some(serde_json::Value::Array(vec![json_val]))
1420            } else {
1421                Some(serde_json::Value::Array(vec![]))
1422            }
1423        }
1424    }
1425}
1426
1427fn fhirpath_result_to_json_value(result: EvaluationResult) -> Option<serde_json::Value> {
1428    match result {
1429        EvaluationResult::Empty => None,
1430        EvaluationResult::Boolean(b, _) => Some(serde_json::Value::Bool(b)),
1431        EvaluationResult::Integer(i, _) => {
1432            Some(serde_json::Value::Number(serde_json::Number::from(i)))
1433        }
1434        EvaluationResult::Decimal(d, _) => {
1435            // Check if this Decimal represents a whole number
1436            if d.fract().is_zero() {
1437                // Convert to integer if no fractional part
1438                if let Ok(i) = d.to_string().parse::<i64>() {
1439                    Some(serde_json::Value::Number(serde_json::Number::from(i)))
1440                } else {
1441                    // Handle very large numbers as strings
1442                    Some(serde_json::Value::String(d.to_string()))
1443                }
1444            } else {
1445                // Convert Decimal to a float for fractional numbers
1446                if let Ok(f) = d.to_string().parse::<f64>() {
1447                    if let Some(num) = serde_json::Number::from_f64(f) {
1448                        Some(serde_json::Value::Number(num))
1449                    } else {
1450                        Some(serde_json::Value::String(d.to_string()))
1451                    }
1452                } else {
1453                    Some(serde_json::Value::String(d.to_string()))
1454                }
1455            }
1456        }
1457        EvaluationResult::String(s, _) => Some(serde_json::Value::String(s)),
1458        EvaluationResult::Date(s, _) => Some(serde_json::Value::String(s)),
1459        EvaluationResult::DateTime(s, _) => {
1460            // Remove "@" prefix from datetime strings if present
1461            let cleaned = s.strip_prefix("@").unwrap_or(&s);
1462            Some(serde_json::Value::String(cleaned.to_string()))
1463        }
1464        EvaluationResult::Time(s, _) => {
1465            // Remove "@T" prefix from time strings if present
1466            let cleaned = s.strip_prefix("@T").unwrap_or(&s);
1467            Some(serde_json::Value::String(cleaned.to_string()))
1468        }
1469        EvaluationResult::Collection { items, .. } => {
1470            if items.len() == 1 {
1471                // Single item collection - unwrap to the item itself
1472                fhirpath_result_to_json_value(items.into_iter().next().unwrap())
1473            } else if items.is_empty() {
1474                None
1475            } else {
1476                // Multiple items - convert to array
1477                let values: Vec<serde_json::Value> = items
1478                    .into_iter()
1479                    .filter_map(fhirpath_result_to_json_value)
1480                    .collect();
1481                Some(serde_json::Value::Array(values))
1482            }
1483        }
1484        EvaluationResult::Object { map, .. } => {
1485            let mut json_map = serde_json::Map::new();
1486            for (k, v) in map {
1487                if let Some(json_val) = fhirpath_result_to_json_value(v) {
1488                    json_map.insert(k, json_val);
1489                }
1490            }
1491            Some(serde_json::Value::Object(json_map))
1492        }
1493        // Handle other result types as strings
1494        _ => Some(serde_json::Value::String(format!("{:?}", result))),
1495    }
1496}
1497
1498fn extract_iteration_items(result: EvaluationResult) -> Vec<EvaluationResult> {
1499    match result {
1500        EvaluationResult::Collection { items, .. } => items,
1501        EvaluationResult::Empty => Vec::new(),
1502        single_item => vec![single_item],
1503    }
1504}
1505
1506// Generic row generation functions
1507
1508fn generate_rows_from_selects<R, S>(
1509    resources: &[&R],
1510    selects: &[S],
1511    variables: &HashMap<String, EvaluationResult>,
1512) -> Result<(Vec<String>, Vec<ProcessedRow>), SofError>
1513where
1514    R: ResourceTrait + Sync,
1515    S: ViewDefinitionSelectTrait + Sync,
1516    S::Select: ViewDefinitionSelectTrait,
1517{
1518    // Process resources in parallel
1519    let resource_results: Result<Vec<_>, _> = resources
1520        .par_iter()
1521        .map(|resource| {
1522            // Each thread gets its own local column vector
1523            let mut local_columns = Vec::new();
1524            let resource_rows =
1525                generate_rows_for_resource(*resource, selects, &mut local_columns, variables)?;
1526            Ok::<(Vec<String>, Vec<ProcessedRow>), SofError>((local_columns, resource_rows))
1527        })
1528        .collect();
1529
1530    // Handle errors from parallel processing
1531    let resource_results = resource_results?;
1532
1533    // Merge columns from all threads (maintaining order is important)
1534    let mut final_columns = Vec::new();
1535    let mut all_rows = Vec::new();
1536
1537    for (local_columns, resource_rows) in resource_results {
1538        // Merge columns, avoiding duplicates
1539        for col in local_columns {
1540            if !final_columns.contains(&col) {
1541                final_columns.push(col);
1542            }
1543        }
1544        all_rows.extend(resource_rows);
1545    }
1546
1547    Ok((final_columns, all_rows))
1548}
1549
1550fn generate_rows_for_resource<R, S>(
1551    resource: &R,
1552    selects: &[S],
1553    all_columns: &mut Vec<String>,
1554    variables: &HashMap<String, EvaluationResult>,
1555) -> Result<Vec<ProcessedRow>, SofError>
1556where
1557    R: ResourceTrait,
1558    S: ViewDefinitionSelectTrait,
1559    S::Select: ViewDefinitionSelectTrait,
1560{
1561    let fhir_resource = resource.to_fhir_resource();
1562    let mut context = EvaluationContext::new(vec![fhir_resource]);
1563
1564    // Add variables to the context
1565    for (name, value) in variables {
1566        context.set_variable_result(name, value.clone());
1567    }
1568
1569    // Generate all possible row combinations for this resource
1570    let row_combinations = generate_row_combinations(&context, selects, all_columns, variables)?;
1571
1572    Ok(row_combinations)
1573}
1574
1575#[derive(Debug, Clone)]
1576struct RowCombination {
1577    values: Vec<Option<serde_json::Value>>,
1578}
1579
1580fn generate_row_combinations<S>(
1581    context: &EvaluationContext,
1582    selects: &[S],
1583    all_columns: &mut Vec<String>,
1584    variables: &HashMap<String, EvaluationResult>,
1585) -> Result<Vec<ProcessedRow>, SofError>
1586where
1587    S: ViewDefinitionSelectTrait,
1588    S::Select: ViewDefinitionSelectTrait,
1589{
1590    // First pass: collect all column names to ensure consistent ordering
1591    collect_all_columns(selects, all_columns)?;
1592
1593    // Second pass: generate all row combinations
1594    let mut row_combinations = vec![RowCombination {
1595        values: vec![None; all_columns.len()],
1596    }];
1597
1598    for select in selects {
1599        row_combinations =
1600            expand_select_combinations(context, select, &row_combinations, all_columns, variables)?;
1601    }
1602
1603    // Convert to ProcessedRow format
1604    Ok(row_combinations
1605        .into_iter()
1606        .map(|combo| ProcessedRow {
1607            values: combo.values,
1608        })
1609        .collect())
1610}
1611
1612fn collect_all_columns<S>(selects: &[S], all_columns: &mut Vec<String>) -> Result<(), SofError>
1613where
1614    S: ViewDefinitionSelectTrait,
1615{
1616    for select in selects {
1617        // Add columns from this select
1618        if let Some(columns) = select.column() {
1619            for col in columns {
1620                if let Some(name) = col.name() {
1621                    if !all_columns.contains(&name.to_string()) {
1622                        all_columns.push(name.to_string());
1623                    }
1624                }
1625            }
1626        }
1627
1628        // Recursively collect from nested selects
1629        if let Some(nested_selects) = select.select() {
1630            collect_all_columns(nested_selects, all_columns)?;
1631        }
1632
1633        // Collect from unionAll
1634        if let Some(union_selects) = select.union_all() {
1635            collect_all_columns(union_selects, all_columns)?;
1636        }
1637    }
1638    Ok(())
1639}
1640
1641fn expand_select_combinations<S>(
1642    context: &EvaluationContext,
1643    select: &S,
1644    existing_combinations: &[RowCombination],
1645    all_columns: &[String],
1646    variables: &HashMap<String, EvaluationResult>,
1647) -> Result<Vec<RowCombination>, SofError>
1648where
1649    S: ViewDefinitionSelectTrait,
1650    S::Select: ViewDefinitionSelectTrait,
1651{
1652    // Handle forEach and forEachOrNull
1653    if let Some(for_each_path) = select.for_each() {
1654        return expand_for_each_combinations(
1655            context,
1656            select,
1657            existing_combinations,
1658            all_columns,
1659            for_each_path,
1660            false,
1661            variables,
1662        );
1663    }
1664
1665    if let Some(for_each_or_null_path) = select.for_each_or_null() {
1666        return expand_for_each_combinations(
1667            context,
1668            select,
1669            existing_combinations,
1670            all_columns,
1671            for_each_or_null_path,
1672            true,
1673            variables,
1674        );
1675    }
1676
1677    // Handle regular columns (no forEach)
1678    let mut new_combinations = Vec::new();
1679
1680    for existing_combo in existing_combinations {
1681        let mut new_combo = existing_combo.clone();
1682
1683        // Add values from this select's columns
1684        if let Some(columns) = select.column() {
1685            for col in columns {
1686                if let Some(col_name) = col.name() {
1687                    if let Some(col_index) = all_columns.iter().position(|name| name == col_name) {
1688                        let path = col.path().ok_or_else(|| {
1689                            SofError::InvalidViewDefinition("Column path is required".to_string())
1690                        })?;
1691
1692                        match evaluate_expression(path, context) {
1693                            Ok(result) => {
1694                                // Check if this column is marked as a collection
1695                                let is_collection = col.collection().unwrap_or(false);
1696
1697                                new_combo.values[col_index] = if is_collection {
1698                                    fhirpath_result_to_json_value_collection(result)
1699                                } else {
1700                                    fhirpath_result_to_json_value(result)
1701                                };
1702                            }
1703                            Err(e) => {
1704                                return Err(SofError::FhirPathError(format!(
1705                                    "Error evaluating column '{}' with path '{}': {}",
1706                                    col_name, path, e
1707                                )));
1708                            }
1709                        }
1710                    }
1711                }
1712            }
1713        }
1714
1715        new_combinations.push(new_combo);
1716    }
1717
1718    // Handle nested selects
1719    if let Some(nested_selects) = select.select() {
1720        for nested_select in nested_selects {
1721            new_combinations = expand_select_combinations(
1722                context,
1723                nested_select,
1724                &new_combinations,
1725                all_columns,
1726                variables,
1727            )?;
1728        }
1729    }
1730
1731    // Handle unionAll
1732    if let Some(union_selects) = select.union_all() {
1733        let mut union_combinations = Vec::new();
1734
1735        // Process each unionAll select independently, using the combinations that already have
1736        // values from this select's columns and nested selects
1737        for union_select in union_selects {
1738            let select_combinations = expand_select_combinations(
1739                context,
1740                union_select,
1741                &new_combinations,
1742                all_columns,
1743                variables,
1744            )?;
1745            union_combinations.extend(select_combinations);
1746        }
1747
1748        // unionAll replaces new_combinations with the union results
1749        // If no union results, this resource should be filtered out (no rows for this resource)
1750        new_combinations = union_combinations;
1751    }
1752
1753    Ok(new_combinations)
1754}
1755
1756fn expand_for_each_combinations<S>(
1757    context: &EvaluationContext,
1758    select: &S,
1759    existing_combinations: &[RowCombination],
1760    all_columns: &[String],
1761    for_each_path: &str,
1762    allow_null: bool,
1763    variables: &HashMap<String, EvaluationResult>,
1764) -> Result<Vec<RowCombination>, SofError>
1765where
1766    S: ViewDefinitionSelectTrait,
1767    S::Select: ViewDefinitionSelectTrait,
1768{
1769    // Evaluate the forEach expression to get iteration items
1770    let for_each_result = evaluate_expression(for_each_path, context).map_err(|e| {
1771        SofError::FhirPathError(format!(
1772            "Error evaluating forEach expression '{}': {}",
1773            for_each_path, e
1774        ))
1775    })?;
1776
1777    let iteration_items = extract_iteration_items(for_each_result);
1778
1779    if iteration_items.is_empty() {
1780        if allow_null {
1781            // forEachOrNull: generate null rows
1782            let mut new_combinations = Vec::new();
1783            for existing_combo in existing_combinations {
1784                let mut new_combo = existing_combo.clone();
1785
1786                // Set column values to null for this forEach scope
1787                if let Some(columns) = select.column() {
1788                    for col in columns {
1789                        if let Some(col_name) = col.name() {
1790                            if let Some(col_index) =
1791                                all_columns.iter().position(|name| name == col_name)
1792                            {
1793                                new_combo.values[col_index] = None;
1794                            }
1795                        }
1796                    }
1797                }
1798
1799                new_combinations.push(new_combo);
1800            }
1801            return Ok(new_combinations);
1802        } else {
1803            // forEach with empty collection: no rows
1804            return Ok(Vec::new());
1805        }
1806    }
1807
1808    let mut new_combinations = Vec::new();
1809
1810    // For each iteration item, create new combinations
1811    for item in &iteration_items {
1812        // Create a new context with the iteration item
1813        let _item_context = create_iteration_context(item, variables);
1814
1815        for existing_combo in existing_combinations {
1816            let mut new_combo = existing_combo.clone();
1817
1818            // Evaluate columns in the context of the iteration item
1819            if let Some(columns) = select.column() {
1820                for col in columns {
1821                    if let Some(col_name) = col.name() {
1822                        if let Some(col_index) =
1823                            all_columns.iter().position(|name| name == col_name)
1824                        {
1825                            let path = col.path().ok_or_else(|| {
1826                                SofError::InvalidViewDefinition(
1827                                    "Column path is required".to_string(),
1828                                )
1829                            })?;
1830
1831                            // Use the iteration item directly for path evaluation
1832                            let result = if path == "$this" {
1833                                // Special case: $this refers to the current iteration item
1834                                item.clone()
1835                            } else {
1836                                // Evaluate the path on the iteration item
1837                                evaluate_path_on_item(path, item, variables)?
1838                            };
1839
1840                            // Check if this column is marked as a collection
1841                            let is_collection = col.collection().unwrap_or(false);
1842
1843                            new_combo.values[col_index] = if is_collection {
1844                                fhirpath_result_to_json_value_collection(result)
1845                            } else {
1846                                fhirpath_result_to_json_value(result)
1847                            };
1848                        }
1849                    }
1850                }
1851            }
1852
1853            new_combinations.push(new_combo);
1854        }
1855    }
1856
1857    // Handle nested selects with the forEach context
1858    if let Some(nested_selects) = select.select() {
1859        let mut final_combinations = Vec::new();
1860
1861        for item in &iteration_items {
1862            let item_context = create_iteration_context(item, variables);
1863
1864            // For each iteration item, we need to start with the combinations that have
1865            // the correct column values for this forEach scope
1866            for existing_combo in existing_combinations {
1867                // Find the combination that corresponds to this iteration item
1868                // by looking at the values we set for columns in this forEach scope
1869                let mut base_combo = existing_combo.clone();
1870
1871                // Update the base combination with column values for this iteration item
1872                if let Some(columns) = select.column() {
1873                    for col in columns {
1874                        if let Some(col_name) = col.name() {
1875                            if let Some(col_index) =
1876                                all_columns.iter().position(|name| name == col_name)
1877                            {
1878                                let path = col.path().ok_or_else(|| {
1879                                    SofError::InvalidViewDefinition(
1880                                        "Column path is required".to_string(),
1881                                    )
1882                                })?;
1883
1884                                let result = if path == "$this" {
1885                                    item.clone()
1886                                } else {
1887                                    evaluate_path_on_item(path, item, variables)?
1888                                };
1889
1890                                // Check if this column is marked as a collection
1891                                let is_collection = col.collection().unwrap_or(false);
1892
1893                                base_combo.values[col_index] = if is_collection {
1894                                    fhirpath_result_to_json_value_collection(result)
1895                                } else {
1896                                    fhirpath_result_to_json_value(result)
1897                                };
1898                            }
1899                        }
1900                    }
1901                }
1902
1903                // Start with this base combination for nested processing
1904                let mut item_combinations = vec![base_combo];
1905
1906                // Process nested selects
1907                for nested_select in nested_selects {
1908                    item_combinations = expand_select_combinations(
1909                        &item_context,
1910                        nested_select,
1911                        &item_combinations,
1912                        all_columns,
1913                        variables,
1914                    )?;
1915                }
1916
1917                final_combinations.extend(item_combinations);
1918            }
1919        }
1920
1921        new_combinations = final_combinations;
1922    }
1923
1924    // Handle unionAll within forEach context
1925    if let Some(union_selects) = select.union_all() {
1926        let mut union_combinations = Vec::new();
1927
1928        for item in &iteration_items {
1929            let item_context = create_iteration_context(item, variables);
1930
1931            // For each iteration item, process all unionAll selects
1932            for existing_combo in existing_combinations {
1933                let mut base_combo = existing_combo.clone();
1934
1935                // Update the base combination with column values for this iteration item
1936                if let Some(columns) = select.column() {
1937                    for col in columns {
1938                        if let Some(col_name) = col.name() {
1939                            if let Some(col_index) =
1940                                all_columns.iter().position(|name| name == col_name)
1941                            {
1942                                let path = col.path().ok_or_else(|| {
1943                                    SofError::InvalidViewDefinition(
1944                                        "Column path is required".to_string(),
1945                                    )
1946                                })?;
1947
1948                                let result = if path == "$this" {
1949                                    item.clone()
1950                                } else {
1951                                    evaluate_path_on_item(path, item, variables)?
1952                                };
1953
1954                                // Check if this column is marked as a collection
1955                                let is_collection = col.collection().unwrap_or(false);
1956
1957                                base_combo.values[col_index] = if is_collection {
1958                                    fhirpath_result_to_json_value_collection(result)
1959                                } else {
1960                                    fhirpath_result_to_json_value(result)
1961                                };
1962                            }
1963                        }
1964                    }
1965                }
1966
1967                // Also evaluate columns from nested selects and add them to base_combo
1968                if let Some(nested_selects) = select.select() {
1969                    for nested_select in nested_selects {
1970                        if let Some(nested_columns) = nested_select.column() {
1971                            for col in nested_columns {
1972                                if let Some(col_name) = col.name() {
1973                                    if let Some(col_index) =
1974                                        all_columns.iter().position(|name| name == col_name)
1975                                    {
1976                                        let path = col.path().ok_or_else(|| {
1977                                            SofError::InvalidViewDefinition(
1978                                                "Column path is required".to_string(),
1979                                            )
1980                                        })?;
1981
1982                                        let result = if path == "$this" {
1983                                            item.clone()
1984                                        } else {
1985                                            evaluate_path_on_item(path, item, variables)?
1986                                        };
1987
1988                                        // Check if this column is marked as a collection
1989                                        let is_collection = col.collection().unwrap_or(false);
1990
1991                                        base_combo.values[col_index] = if is_collection {
1992                                            fhirpath_result_to_json_value_collection(result)
1993                                        } else {
1994                                            fhirpath_result_to_json_value(result)
1995                                        };
1996                                    }
1997                                }
1998                            }
1999                        }
2000                    }
2001                }
2002
2003                // Process each unionAll select independently for this iteration item
2004                for union_select in union_selects {
2005                    let mut select_combinations = vec![base_combo.clone()];
2006                    select_combinations = expand_select_combinations(
2007                        &item_context,
2008                        union_select,
2009                        &select_combinations,
2010                        all_columns,
2011                        variables,
2012                    )?;
2013                    union_combinations.extend(select_combinations);
2014                }
2015            }
2016        }
2017
2018        // unionAll replaces new_combinations with the union results
2019        // If no union results, filter out this resource (no rows for this resource)
2020        new_combinations = union_combinations;
2021    }
2022
2023    Ok(new_combinations)
2024}
2025
2026// Generic helper functions
2027fn evaluate_path_on_item(
2028    path: &str,
2029    item: &EvaluationResult,
2030    variables: &HashMap<String, EvaluationResult>,
2031) -> Result<EvaluationResult, SofError> {
2032    // Create a temporary context with the iteration item as the root resource
2033    let mut temp_context = match item {
2034        EvaluationResult::Object { .. } => {
2035            // Convert the iteration item to a resource-like structure for FHIRPath evaluation
2036            // For simplicity, we'll create a basic context where the item is available for evaluation
2037            let mut context = EvaluationContext::new(vec![]);
2038            context.this = Some(item.clone());
2039            context
2040        }
2041        _ => EvaluationContext::new(vec![]),
2042    };
2043
2044    // Add variables to the temporary context
2045    for (name, value) in variables {
2046        temp_context.set_variable_result(name, value.clone());
2047    }
2048
2049    // Evaluate the FHIRPath expression in the context of the iteration item
2050    match evaluate_expression(path, &temp_context) {
2051        Ok(result) => Ok(result),
2052        Err(_e) => {
2053            // If FHIRPath evaluation fails, try simple property access as fallback
2054            match item {
2055                EvaluationResult::Object { map, .. } => {
2056                    if let Some(value) = map.get(path) {
2057                        Ok(value.clone())
2058                    } else {
2059                        Ok(EvaluationResult::Empty)
2060                    }
2061                }
2062                _ => Ok(EvaluationResult::Empty),
2063            }
2064        }
2065    }
2066}
2067
2068fn create_iteration_context(
2069    item: &EvaluationResult,
2070    variables: &HashMap<String, EvaluationResult>,
2071) -> EvaluationContext {
2072    // Create a new context with the iteration item as the root
2073    let mut context = EvaluationContext::new(vec![]);
2074    context.this = Some(item.clone());
2075
2076    // Preserve variables from the parent context
2077    for (name, value) in variables {
2078        context.set_variable_result(name, value.clone());
2079    }
2080
2081    context
2082}
2083
2084/// Filter a bundle's resources by their lastUpdated metadata
2085fn filter_bundle_by_since(bundle: SofBundle, since: DateTime<Utc>) -> Result<SofBundle, SofError> {
2086    match bundle {
2087        #[cfg(feature = "R4")]
2088        SofBundle::R4(mut b) => {
2089            if let Some(entries) = b.entry.as_mut() {
2090                entries.retain(|entry| {
2091                    entry
2092                        .resource
2093                        .as_ref()
2094                        .and_then(|r| r.get_last_updated())
2095                        .map(|last_updated| last_updated > since)
2096                        .unwrap_or(false)
2097                });
2098            }
2099            Ok(SofBundle::R4(b))
2100        }
2101        #[cfg(feature = "R4B")]
2102        SofBundle::R4B(mut b) => {
2103            if let Some(entries) = b.entry.as_mut() {
2104                entries.retain(|entry| {
2105                    entry
2106                        .resource
2107                        .as_ref()
2108                        .and_then(|r| r.get_last_updated())
2109                        .map(|last_updated| last_updated > since)
2110                        .unwrap_or(false)
2111                });
2112            }
2113            Ok(SofBundle::R4B(b))
2114        }
2115        #[cfg(feature = "R5")]
2116        SofBundle::R5(mut b) => {
2117            if let Some(entries) = b.entry.as_mut() {
2118                entries.retain(|entry| {
2119                    entry
2120                        .resource
2121                        .as_ref()
2122                        .and_then(|r| r.get_last_updated())
2123                        .map(|last_updated| last_updated > since)
2124                        .unwrap_or(false)
2125                });
2126            }
2127            Ok(SofBundle::R5(b))
2128        }
2129        #[cfg(feature = "R6")]
2130        SofBundle::R6(mut b) => {
2131            if let Some(entries) = b.entry.as_mut() {
2132                entries.retain(|entry| {
2133                    entry
2134                        .resource
2135                        .as_ref()
2136                        .and_then(|r| r.get_last_updated())
2137                        .map(|last_updated| last_updated > since)
2138                        .unwrap_or(false)
2139                });
2140            }
2141            Ok(SofBundle::R6(b))
2142        }
2143    }
2144}
2145
2146/// Apply pagination to processed results
2147fn apply_pagination_to_result(
2148    mut result: ProcessedResult,
2149    limit: Option<usize>,
2150    page: Option<usize>,
2151) -> Result<ProcessedResult, SofError> {
2152    if let Some(limit) = limit {
2153        let page_num = page.unwrap_or(1);
2154        if page_num == 0 {
2155            return Err(SofError::InvalidViewDefinition(
2156                "Page number must be greater than 0".to_string(),
2157            ));
2158        }
2159
2160        let start_index = (page_num - 1) * limit;
2161        if start_index >= result.rows.len() {
2162            // Return empty result if page is beyond data
2163            result.rows.clear();
2164        } else {
2165            let end_index = std::cmp::min(start_index + limit, result.rows.len());
2166            result.rows = result.rows[start_index..end_index].to_vec();
2167        }
2168    }
2169
2170    Ok(result)
2171}
2172
2173fn format_output(
2174    result: ProcessedResult,
2175    content_type: ContentType,
2176    parquet_options: Option<&ParquetOptions>,
2177) -> Result<Vec<u8>, SofError> {
2178    match content_type {
2179        ContentType::Csv | ContentType::CsvWithHeader => {
2180            format_csv(result, content_type == ContentType::CsvWithHeader)
2181        }
2182        ContentType::Json => format_json(result),
2183        ContentType::NdJson => format_ndjson(result),
2184        ContentType::Parquet => format_parquet(result, parquet_options),
2185    }
2186}
2187
2188fn format_csv(result: ProcessedResult, include_header: bool) -> Result<Vec<u8>, SofError> {
2189    let mut wtr = csv::Writer::from_writer(vec![]);
2190
2191    if include_header {
2192        wtr.write_record(&result.columns)?;
2193    }
2194
2195    for row in result.rows {
2196        let record: Vec<String> = row
2197            .values
2198            .iter()
2199            .map(|v| match v {
2200                Some(val) => {
2201                    // For string values, extract the raw string instead of JSON serializing
2202                    if let serde_json::Value::String(s) = val {
2203                        s.clone()
2204                    } else {
2205                        // For non-string values, use JSON serialization
2206                        serde_json::to_string(val).unwrap_or_default()
2207                    }
2208                }
2209                None => String::new(),
2210            })
2211            .collect();
2212        wtr.write_record(&record)?;
2213    }
2214
2215    wtr.into_inner()
2216        .map_err(|e| SofError::CsvWriterError(e.to_string()))
2217}
2218
2219fn format_json(result: ProcessedResult) -> Result<Vec<u8>, SofError> {
2220    let mut output = Vec::new();
2221
2222    for row in result.rows {
2223        let mut row_obj = serde_json::Map::new();
2224        for (i, column) in result.columns.iter().enumerate() {
2225            let value = row
2226                .values
2227                .get(i)
2228                .and_then(|v| v.as_ref())
2229                .cloned()
2230                .unwrap_or(serde_json::Value::Null);
2231            row_obj.insert(column.clone(), value);
2232        }
2233        output.push(serde_json::Value::Object(row_obj));
2234    }
2235
2236    Ok(serde_json::to_vec_pretty(&output)?)
2237}
2238
2239fn format_ndjson(result: ProcessedResult) -> Result<Vec<u8>, SofError> {
2240    let mut output = Vec::new();
2241
2242    for row in result.rows {
2243        let mut row_obj = serde_json::Map::new();
2244        for (i, column) in result.columns.iter().enumerate() {
2245            let value = row
2246                .values
2247                .get(i)
2248                .and_then(|v| v.as_ref())
2249                .cloned()
2250                .unwrap_or(serde_json::Value::Null);
2251            row_obj.insert(column.clone(), value);
2252        }
2253        let line = serde_json::to_string(&serde_json::Value::Object(row_obj))?;
2254        output.extend_from_slice(line.as_bytes());
2255        output.push(b'\n');
2256    }
2257
2258    Ok(output)
2259}
2260
2261fn format_parquet(
2262    result: ProcessedResult,
2263    options: Option<&ParquetOptions>,
2264) -> Result<Vec<u8>, SofError> {
2265    use arrow::record_batch::RecordBatch;
2266    use parquet::arrow::ArrowWriter;
2267    use parquet::basic::Compression;
2268    use parquet::file::properties::WriterProperties;
2269    use std::io::Cursor;
2270
2271    // Create Arrow schema from columns and sample data
2272    let schema = parquet_schema::create_arrow_schema(&result.columns, &result.rows)?;
2273    let schema_ref = std::sync::Arc::new(schema.clone());
2274
2275    // Get configuration from options or use defaults
2276    let parquet_opts = options.cloned().unwrap_or_default();
2277
2278    // Calculate optimal batch size based on row count and estimated row size
2279    let target_row_group_size_bytes = (parquet_opts.row_group_size_mb as usize) * 1024 * 1024;
2280    let target_page_size_bytes = (parquet_opts.page_size_kb as usize) * 1024;
2281    const TARGET_ROWS_PER_BATCH: usize = 100_000; // Default batch size
2282    const MAX_ROWS_PER_BATCH: usize = 500_000; // Maximum to prevent memory issues
2283
2284    // Estimate average row size from first 100 rows
2285    let sample_size = std::cmp::min(100, result.rows.len());
2286    let mut estimated_row_size = 100; // Default estimate in bytes
2287
2288    if sample_size > 0 {
2289        let sample_json_size: usize = result.rows[..sample_size]
2290            .iter()
2291            .map(|row| {
2292                row.values
2293                    .iter()
2294                    .filter_map(|v| v.as_ref())
2295                    .map(|v| v.to_string().len())
2296                    .sum::<usize>()
2297            })
2298            .sum();
2299        estimated_row_size = (sample_json_size / sample_size).max(50);
2300    }
2301
2302    // Calculate optimal batch size
2303    let optimal_batch_size = (target_row_group_size_bytes / estimated_row_size)
2304        .clamp(TARGET_ROWS_PER_BATCH, MAX_ROWS_PER_BATCH);
2305
2306    // Parse compression algorithm
2307    use parquet::basic::BrotliLevel;
2308    use parquet::basic::GzipLevel;
2309    use parquet::basic::ZstdLevel;
2310
2311    let compression = match parquet_opts.compression.as_str() {
2312        "none" => Compression::UNCOMPRESSED,
2313        "gzip" => Compression::GZIP(GzipLevel::default()),
2314        "lz4" => Compression::LZ4,
2315        "brotli" => Compression::BROTLI(BrotliLevel::default()),
2316        "zstd" => Compression::ZSTD(ZstdLevel::default()),
2317        _ => Compression::SNAPPY, // Default to snappy
2318    };
2319
2320    // Set up writer properties with optimized settings
2321    let props = WriterProperties::builder()
2322        .set_compression(compression)
2323        .set_max_row_group_size(target_row_group_size_bytes)
2324        .set_data_page_row_count_limit(20_000) // Optimal for predicate pushdown
2325        .set_data_page_size_limit(target_page_size_bytes)
2326        .set_write_batch_size(8192) // Control write granularity
2327        .build();
2328
2329    // Write to memory buffer
2330    let mut buffer = Vec::new();
2331    let mut cursor = Cursor::new(&mut buffer);
2332    let mut writer =
2333        ArrowWriter::try_new(&mut cursor, schema_ref.clone(), Some(props)).map_err(|e| {
2334            SofError::ParquetConversionError(format!("Failed to create Parquet writer: {}", e))
2335        })?;
2336
2337    // Process data in batches to handle large datasets efficiently
2338    let mut row_offset = 0;
2339    while row_offset < result.rows.len() {
2340        let batch_end = (row_offset + optimal_batch_size).min(result.rows.len());
2341        let batch_rows = &result.rows[row_offset..batch_end];
2342
2343        // Convert batch to Arrow arrays
2344        let batch_arrays =
2345            parquet_schema::process_to_arrow_arrays(&schema, &result.columns, batch_rows)?;
2346
2347        // Create RecordBatch for this chunk
2348        let batch = RecordBatch::try_new(schema_ref.clone(), batch_arrays).map_err(|e| {
2349            SofError::ParquetConversionError(format!(
2350                "Failed to create RecordBatch for rows {}-{}: {}",
2351                row_offset, batch_end, e
2352            ))
2353        })?;
2354
2355        // Write batch
2356        writer.write(&batch).map_err(|e| {
2357            SofError::ParquetConversionError(format!(
2358                "Failed to write RecordBatch for rows {}-{}: {}",
2359                row_offset, batch_end, e
2360            ))
2361        })?;
2362
2363        row_offset = batch_end;
2364    }
2365
2366    writer.close().map_err(|e| {
2367        SofError::ParquetConversionError(format!("Failed to close Parquet writer: {}", e))
2368    })?;
2369
2370    Ok(buffer)
2371}
2372
2373/// Format Parquet data with automatic file splitting when size exceeds limit
2374pub fn format_parquet_multi_file(
2375    result: ProcessedResult,
2376    options: Option<&ParquetOptions>,
2377    max_file_size_bytes: usize,
2378) -> Result<Vec<Vec<u8>>, SofError> {
2379    use arrow::record_batch::RecordBatch;
2380    use parquet::arrow::ArrowWriter;
2381    use parquet::basic::Compression;
2382    use parquet::file::properties::WriterProperties;
2383    use std::io::Cursor;
2384
2385    // Create Arrow schema from columns and sample data
2386    let schema = parquet_schema::create_arrow_schema(&result.columns, &result.rows)?;
2387    let schema_ref = std::sync::Arc::new(schema.clone());
2388
2389    // Get configuration from options or use defaults
2390    let parquet_opts = options.cloned().unwrap_or_default();
2391
2392    // Calculate optimal batch size
2393    let target_row_group_size_bytes = (parquet_opts.row_group_size_mb as usize) * 1024 * 1024;
2394    let target_page_size_bytes = (parquet_opts.page_size_kb as usize) * 1024;
2395    const TARGET_ROWS_PER_BATCH: usize = 100_000;
2396    const MAX_ROWS_PER_BATCH: usize = 500_000;
2397
2398    // Estimate average row size
2399    let sample_size = std::cmp::min(100, result.rows.len());
2400    let mut estimated_row_size = 100;
2401
2402    if sample_size > 0 {
2403        let sample_json_size: usize = result.rows[..sample_size]
2404            .iter()
2405            .map(|row| {
2406                row.values
2407                    .iter()
2408                    .filter_map(|v| v.as_ref())
2409                    .map(|v| v.to_string().len())
2410                    .sum::<usize>()
2411            })
2412            .sum();
2413        estimated_row_size = (sample_json_size / sample_size).max(50);
2414    }
2415
2416    let optimal_batch_size = (target_row_group_size_bytes / estimated_row_size)
2417        .clamp(TARGET_ROWS_PER_BATCH, MAX_ROWS_PER_BATCH);
2418
2419    // Parse compression algorithm
2420    use parquet::basic::BrotliLevel;
2421    use parquet::basic::GzipLevel;
2422    use parquet::basic::ZstdLevel;
2423
2424    let compression = match parquet_opts.compression.as_str() {
2425        "none" => Compression::UNCOMPRESSED,
2426        "gzip" => Compression::GZIP(GzipLevel::default()),
2427        "lz4" => Compression::LZ4,
2428        "brotli" => Compression::BROTLI(BrotliLevel::default()),
2429        "zstd" => Compression::ZSTD(ZstdLevel::default()),
2430        _ => Compression::SNAPPY,
2431    };
2432
2433    // Set up writer properties
2434    let props = WriterProperties::builder()
2435        .set_compression(compression)
2436        .set_max_row_group_size(target_row_group_size_bytes)
2437        .set_data_page_row_count_limit(20_000)
2438        .set_data_page_size_limit(target_page_size_bytes)
2439        .set_write_batch_size(8192)
2440        .build();
2441
2442    let mut file_buffers = Vec::new();
2443    let mut current_buffer = Vec::new();
2444    let mut current_cursor = Cursor::new(&mut current_buffer);
2445    let mut current_writer =
2446        ArrowWriter::try_new(&mut current_cursor, schema_ref.clone(), Some(props.clone()))
2447            .map_err(|e| {
2448                SofError::ParquetConversionError(format!("Failed to create Parquet writer: {}", e))
2449            })?;
2450
2451    let mut row_offset = 0;
2452    let mut _current_file_rows = 0;
2453
2454    while row_offset < result.rows.len() {
2455        let batch_end = (row_offset + optimal_batch_size).min(result.rows.len());
2456        let batch_rows = &result.rows[row_offset..batch_end];
2457
2458        // Convert batch to Arrow arrays
2459        let batch_arrays =
2460            parquet_schema::process_to_arrow_arrays(&schema, &result.columns, batch_rows)?;
2461
2462        // Create RecordBatch
2463        let batch = RecordBatch::try_new(schema_ref.clone(), batch_arrays).map_err(|e| {
2464            SofError::ParquetConversionError(format!(
2465                "Failed to create RecordBatch for rows {}-{}: {}",
2466                row_offset, batch_end, e
2467            ))
2468        })?;
2469
2470        // Write batch
2471        current_writer.write(&batch).map_err(|e| {
2472            SofError::ParquetConversionError(format!(
2473                "Failed to write RecordBatch for rows {}-{}: {}",
2474                row_offset, batch_end, e
2475            ))
2476        })?;
2477
2478        _current_file_rows += batch_end - row_offset;
2479        row_offset = batch_end;
2480
2481        // Check if we should start a new file
2482        // Get actual size of current buffer by flushing the writer
2483        let current_size = current_writer.bytes_written();
2484
2485        if current_size >= max_file_size_bytes && row_offset < result.rows.len() {
2486            // Close current file
2487            current_writer.close().map_err(|e| {
2488                SofError::ParquetConversionError(format!("Failed to close Parquet writer: {}", e))
2489            })?;
2490
2491            // Save the buffer
2492            file_buffers.push(current_buffer);
2493
2494            // Start new file
2495            current_buffer = Vec::new();
2496            current_cursor = Cursor::new(&mut current_buffer);
2497            current_writer =
2498                ArrowWriter::try_new(&mut current_cursor, schema_ref.clone(), Some(props.clone()))
2499                    .map_err(|e| {
2500                        SofError::ParquetConversionError(format!(
2501                            "Failed to create new Parquet writer: {}",
2502                            e
2503                        ))
2504                    })?;
2505            _current_file_rows = 0;
2506        }
2507    }
2508
2509    // Close the final writer
2510    current_writer.close().map_err(|e| {
2511        SofError::ParquetConversionError(format!("Failed to close final Parquet writer: {}", e))
2512    })?;
2513
2514    file_buffers.push(current_buffer);
2515
2516    Ok(file_buffers)
2517}