helios_sof/
lib.rs

1//! # SQL-on-FHIR Implementation
2//!
3//! This crate provides a complete implementation of the [SQL-on-FHIR
4//! specification](https://sql-on-fhir.org/ig/latest),
5//! enabling the transformation of FHIR resources into tabular data using declarative
6//! ViewDefinitions. It supports all major FHIR versions (R4, R4B, R5, R6) through
7//! a version-agnostic abstraction layer.
8
9//!
10//! There are three consumers of this crate:
11//! - [sof_cli](../sof_cli/index.html) - A command-line interface for the SQL-on-FHIR implementation,
12//!   allowing users to execute ViewDefinition transformations on FHIR Bundle resources
13//!   and output the results in various formats.
14//! - [sof_server](../sof_server/index.html) - A stateless HTTP server implementation for the SQL-on-FHIR specification,
15//!   enabling HTTP-based access to ViewDefinition transformation capabilities.
16//! - [hfs](../hfs/index.html) - The full featured Helios FHIR Server.
17//!
18//! ## Architecture
19//!
20//! The SOF crate is organized around these key components:
21//! - **Version-agnostic enums** ([`SofViewDefinition`], [`SofBundle`]): Multi-version containers
22//! - **Processing engine** ([`run_view_definition`]): Core transformation logic
23//! - **Output formats** ([`ContentType`]): Support for CSV, JSON, NDJSON, and Parquet
24//! - **Trait abstractions** ([`ViewDefinitionTrait`], [`BundleTrait`]): Version independence
25//!
26//! ## Key Features
27//!
28//! - **Multi-version FHIR support**: Works with R4, R4B, R5, and R6 resources
29//! - **FHIRPath evaluation**: Complex path expressions for data extraction
30//! - **forEach iteration**: Supports flattening of nested FHIR structures
31//! - **unionAll operations**: Combines multiple select statements
32//! - **Collection handling**: Proper array serialization for multi-valued fields
33//! - **Output formats**: CSV (with/without headers), JSON, NDJSON, Parquet support
34//!
35//! ## Usage Example
36//!
37//! ```rust
38//! # #[cfg(not(target_os = "windows"))]
39//! # {
40//! use helios_sof::{SofViewDefinition, SofBundle, ContentType, run_view_definition};
41//! use helios_fhir::FhirVersion;
42//!
43//! # #[cfg(feature = "R4")]
44//! # {
45//! // Parse a ViewDefinition and Bundle from JSON
46//! let view_definition_json = r#"{
47//!     "resourceType": "ViewDefinition",
48//!     "status": "active",
49//!     "resource": "Patient",
50//!     "select": [{
51//!         "column": [{
52//!             "name": "id",
53//!             "path": "id"
54//!         }, {
55//!             "name": "name",
56//!             "path": "name.family"
57//!         }]
58//!     }]
59//! }"#;
60//!
61//! let bundle_json = r#"{
62//!     "resourceType": "Bundle",
63//!     "type": "collection",
64//!     "entry": [{
65//!         "resource": {
66//!             "resourceType": "Patient",
67//!             "id": "example",
68//!             "name": [{
69//!                 "family": "Doe",
70//!                 "given": ["John"]
71//!             }]
72//!         }
73//!     }]
74//! }"#;
75//!
76//! let view_definition: helios_fhir::r4::ViewDefinition = serde_json::from_str(view_definition_json)?;
77//! let bundle: helios_fhir::r4::Bundle = serde_json::from_str(bundle_json)?;
78//!
79//! // Wrap in version-agnostic containers
80//! let sof_view = SofViewDefinition::R4(view_definition);
81//! let sof_bundle = SofBundle::R4(bundle);
82//!
83//! // Transform to CSV with headers
84//! let csv_output = run_view_definition(
85//!     sof_view,
86//!     sof_bundle,
87//!     ContentType::CsvWithHeader
88//! )?;
89//!
90//! // Check the CSV output
91//! let csv_string = String::from_utf8(csv_output)?;
92//! assert!(csv_string.contains("id,name"));
93//! // CSV values are quoted
94//! assert!(csv_string.contains("example") && csv_string.contains("Doe"));
95//! # }
96//! # }
97//! # Ok::<(), Box<dyn std::error::Error>>(())
98//! ```
99//!
100//! ## Advanced Features
101//!
102//! ### forEach Iteration
103//!
104//! ViewDefinitions can iterate over collections using `forEach` and `forEachOrNull`:
105//!
106//! ```json
107//! {
108//!   "select": [{
109//!     "forEach": "name",
110//!     "column": [{
111//!       "name": "family_name",
112//!       "path": "family"
113//!     }]
114//!   }]
115//! }
116//! ```
117//!
118//! ### Constants and Variables
119//!
120//! Define reusable values in ViewDefinitions:
121//!
122//! ```json
123//! {
124//!   "constant": [{
125//!     "name": "system",
126//!     "valueString": "http://loinc.org"
127//!   }],
128//!   "select": [{
129//!     "where": [{
130//!       "path": "code.coding.system = %system"
131//!     }]
132//!   }]
133//! }
134//! ```
135//!
136//! ### Where Clauses
137//!
138//! Filter resources using FHIRPath expressions:
139//!
140//! ```json
141//! {
142//!   "where": [{
143//!     "path": "active = true"
144//!   }, {
145//!     "path": "birthDate.exists()"
146//!   }]
147//! }
148//! ```
149//!
150//! ## Error Handling
151//!
152//! The crate provides comprehensive error handling through [`SofError`]:
153//!
154//! ```rust,no_run
155//! use helios_sof::{SofError, SofViewDefinition, SofBundle, ContentType, run_view_definition};
156//!
157//! # let view = SofViewDefinition::R4(helios_fhir::r4::ViewDefinition::default());
158//! # let bundle = SofBundle::R4(helios_fhir::r4::Bundle::default());
159//! # let content_type = ContentType::Json;
160//! match run_view_definition(view, bundle, content_type) {
161//!     Ok(output) => {
162//!         // Process successful transformation
163//!     },
164//!     Err(SofError::InvalidViewDefinition(msg)) => {
165//!         eprintln!("ViewDefinition validation failed: {}", msg);
166//!     },
167//!     Err(SofError::FhirPathError(msg)) => {
168//!         eprintln!("FHIRPath evaluation failed: {}", msg);
169//!     },
170//!     Err(e) => {
171//!         eprintln!("Other error: {}", e);
172//!     }
173//! }
174//! ```
175//! ## Feature Flags
176//!
177//! Enable support for specific FHIR versions:
178//! - `R4`: FHIR 4.0.1 support
179//! - `R4B`: FHIR 4.3.0 support
180//! - `R5`: FHIR 5.0.0 support
181//! - `R6`: FHIR 6.0.0 support
182
183pub mod data_source;
184pub mod parquet_schema;
185pub mod traits;
186
187use chrono::{DateTime, Utc};
188use helios_fhirpath::{EvaluationContext, EvaluationResult, evaluate_expression};
189use rayon::prelude::*;
190use serde::{Deserialize, Serialize};
191use std::collections::HashMap;
192use std::io::{BufRead, Write};
193use thiserror::Error;
194use traits::*;
195
196// Re-export commonly used types and traits for easier access
197pub use helios_fhir::FhirVersion;
198pub use traits::{BundleTrait, ResourceTrait, ViewDefinitionTrait};
199
200/// Multi-version ViewDefinition container supporting version-agnostic operations.
201///
202/// This enum provides a unified interface for working with ViewDefinition resources
203/// across different FHIR specification versions. It enables applications to handle
204/// multiple FHIR versions simultaneously while maintaining type safety.
205///
206/// # Supported Versions
207///
208/// - **R4**: FHIR 4.0.1 ViewDefinition (normative)
209/// - **R4B**: FHIR 4.3.0 ViewDefinition (ballot)
210/// - **R5**: FHIR 5.0.0 ViewDefinition (ballot)
211/// - **R6**: FHIR 6.0.0 ViewDefinition (draft)
212///
213/// # Examples
214///
215/// ```rust
216/// use helios_sof::{SofViewDefinition, ContentType};
217/// # #[cfg(feature = "R4")]
218/// use helios_fhir::r4::ViewDefinition;
219///
220/// # #[cfg(feature = "R4")]
221/// # {
222/// // Parse from JSON
223/// let json = r#"{
224///     "resourceType": "ViewDefinition",
225///     "resource": "Patient",
226///     "select": [{
227///         "column": [{
228///             "name": "id",
229///             "path": "id"
230///         }]
231///     }]
232/// }"#;
233///
234/// let view_def: ViewDefinition = serde_json::from_str(json)?;
235/// let sof_view = SofViewDefinition::R4(view_def);
236///
237/// // Check version
238/// assert_eq!(sof_view.version(), helios_fhir::FhirVersion::R4);
239/// # }
240/// # Ok::<(), Box<dyn std::error::Error>>(())
241/// ```
242#[derive(Debug, Clone)]
243pub enum SofViewDefinition {
244    #[cfg(feature = "R4")]
245    R4(helios_fhir::r4::ViewDefinition),
246    #[cfg(feature = "R4B")]
247    R4B(helios_fhir::r4b::ViewDefinition),
248    #[cfg(feature = "R5")]
249    R5(helios_fhir::r5::ViewDefinition),
250    #[cfg(feature = "R6")]
251    R6(helios_fhir::r6::ViewDefinition),
252}
253
254impl SofViewDefinition {
255    /// Returns the FHIR specification version of this ViewDefinition.
256    ///
257    /// This method provides version detection for multi-version applications,
258    /// enabling version-specific processing logic and compatibility checks.
259    ///
260    /// # Returns
261    ///
262    /// The `FhirVersion` enum variant corresponding to this ViewDefinition's specification.
263    ///
264    /// # Examples
265    ///
266    /// ```rust
267    /// use helios_sof::SofViewDefinition;
268    /// use helios_fhir::FhirVersion;
269    ///
270    /// # #[cfg(feature = "R5")]
271    /// # {
272    /// # let view_def = helios_fhir::r5::ViewDefinition::default();
273    /// let sof_view = SofViewDefinition::R5(view_def);
274    /// assert_eq!(sof_view.version(), helios_fhir::FhirVersion::R5);
275    /// # }
276    /// ```
277    pub fn version(&self) -> helios_fhir::FhirVersion {
278        match self {
279            #[cfg(feature = "R4")]
280            SofViewDefinition::R4(_) => helios_fhir::FhirVersion::R4,
281            #[cfg(feature = "R4B")]
282            SofViewDefinition::R4B(_) => helios_fhir::FhirVersion::R4B,
283            #[cfg(feature = "R5")]
284            SofViewDefinition::R5(_) => helios_fhir::FhirVersion::R5,
285            #[cfg(feature = "R6")]
286            SofViewDefinition::R6(_) => helios_fhir::FhirVersion::R6,
287        }
288    }
289}
290
291/// Multi-version Bundle container supporting version-agnostic operations.
292///
293/// This enum provides a unified interface for working with FHIR Bundle resources
294/// across different FHIR specification versions. Bundles contain the actual FHIR
295/// resources that will be processed by ViewDefinitions.
296///
297/// # Supported Versions
298///
299/// - **R4**: FHIR 4.0.1 Bundle (normative)
300/// - **R4B**: FHIR 4.3.0 Bundle (ballot)
301/// - **R5**: FHIR 5.0.0 Bundle (ballot)
302/// - **R6**: FHIR 6.0.0 Bundle (draft)
303///
304/// # Examples
305///
306/// ```rust
307/// # #[cfg(not(target_os = "windows"))]
308/// # {
309/// use helios_sof::SofBundle;
310/// # #[cfg(feature = "R4")]
311/// use helios_fhir::r4::Bundle;
312///
313/// # #[cfg(feature = "R4")]
314/// # {
315/// // Parse from JSON
316/// let json = r#"{
317///     "resourceType": "Bundle",
318///     "type": "collection",
319///     "entry": [{
320///         "resource": {
321///             "resourceType": "Patient",
322///             "id": "example"
323///         }
324///     }]
325/// }"#;
326///
327/// let bundle: Bundle = serde_json::from_str(json)?;
328/// let sof_bundle = SofBundle::R4(bundle);
329///
330/// // Check version compatibility
331/// assert_eq!(sof_bundle.version(), helios_fhir::FhirVersion::R4);
332/// # }
333/// # }
334/// # Ok::<(), Box<dyn std::error::Error>>(())
335/// ```
336#[derive(Debug, Clone)]
337pub enum SofBundle {
338    #[cfg(feature = "R4")]
339    R4(helios_fhir::r4::Bundle),
340    #[cfg(feature = "R4B")]
341    R4B(helios_fhir::r4b::Bundle),
342    #[cfg(feature = "R5")]
343    R5(helios_fhir::r5::Bundle),
344    #[cfg(feature = "R6")]
345    R6(helios_fhir::r6::Bundle),
346}
347
348impl SofBundle {
349    /// Returns the FHIR specification version of this Bundle.
350    ///
351    /// This method provides version detection for multi-version applications,
352    /// ensuring that ViewDefinitions and Bundles use compatible FHIR versions.
353    ///
354    /// # Returns
355    ///
356    /// The `FhirVersion` enum variant corresponding to this Bundle's specification.
357    ///
358    /// # Examples
359    ///
360    /// ```rust
361    /// use helios_sof::SofBundle;
362    /// use helios_fhir::FhirVersion;
363    ///
364    /// # #[cfg(feature = "R4")]
365    /// # {
366    /// # let bundle = helios_fhir::r4::Bundle::default();
367    /// let sof_bundle = SofBundle::R4(bundle);
368    /// assert_eq!(sof_bundle.version(), helios_fhir::FhirVersion::R4);
369    /// # }
370    /// ```
371    pub fn version(&self) -> helios_fhir::FhirVersion {
372        match self {
373            #[cfg(feature = "R4")]
374            SofBundle::R4(_) => helios_fhir::FhirVersion::R4,
375            #[cfg(feature = "R4B")]
376            SofBundle::R4B(_) => helios_fhir::FhirVersion::R4B,
377            #[cfg(feature = "R5")]
378            SofBundle::R5(_) => helios_fhir::FhirVersion::R5,
379            #[cfg(feature = "R6")]
380            SofBundle::R6(_) => helios_fhir::FhirVersion::R6,
381        }
382    }
383}
384
385/// Multi-version CapabilityStatement container supporting version-agnostic operations.
386///
387/// This enum provides a unified interface for working with CapabilityStatement resources
388/// across different FHIR specification versions. It enables applications to handle
389/// multiple FHIR versions simultaneously while maintaining type safety.
390///
391/// # Supported Versions
392///
393/// - **R4**: FHIR 4.0.1 CapabilityStatement (normative)
394/// - **R4B**: FHIR 4.3.0 CapabilityStatement (ballot)
395/// - **R5**: FHIR 5.0.0 CapabilityStatement (ballot)
396/// - **R6**: FHIR 6.0.0 CapabilityStatement (draft)
397#[derive(Debug, Clone, Serialize, Deserialize)]
398#[serde(untagged)]
399pub enum SofCapabilityStatement {
400    #[cfg(feature = "R4")]
401    R4(helios_fhir::r4::CapabilityStatement),
402    #[cfg(feature = "R4B")]
403    R4B(helios_fhir::r4b::CapabilityStatement),
404    #[cfg(feature = "R5")]
405    R5(helios_fhir::r5::CapabilityStatement),
406    #[cfg(feature = "R6")]
407    R6(helios_fhir::r6::CapabilityStatement),
408}
409
410impl SofCapabilityStatement {
411    /// Returns the FHIR specification version of this CapabilityStatement.
412    pub fn version(&self) -> helios_fhir::FhirVersion {
413        match self {
414            #[cfg(feature = "R4")]
415            SofCapabilityStatement::R4(_) => helios_fhir::FhirVersion::R4,
416            #[cfg(feature = "R4B")]
417            SofCapabilityStatement::R4B(_) => helios_fhir::FhirVersion::R4B,
418            #[cfg(feature = "R5")]
419            SofCapabilityStatement::R5(_) => helios_fhir::FhirVersion::R5,
420            #[cfg(feature = "R6")]
421            SofCapabilityStatement::R6(_) => helios_fhir::FhirVersion::R6,
422        }
423    }
424}
425
426/// Type alias for the version-independent Parameters container.
427///
428/// This alias provides backward compatibility while using the unified
429/// VersionIndependentParameters from the helios_fhir crate.
430pub type SofParameters = helios_fhir::VersionIndependentParameters;
431
432/// Comprehensive error type for SQL-on-FHIR operations.
433///
434/// This enum covers all possible error conditions that can occur during
435/// ViewDefinition processing, from validation failures to output formatting issues.
436/// Each variant provides specific context about the error to aid in debugging.
437///
438/// # Error Categories
439///
440/// - **Validation**: ViewDefinition structure and logic validation
441/// - **Evaluation**: FHIRPath expression evaluation failures
442/// - **I/O**: File and serialization operations
443/// - **Format**: Output format conversion issues
444///
445/// # Examples
446///
447/// ```rust,no_run
448/// use helios_sof::{SofError, SofViewDefinition, SofBundle, ContentType, run_view_definition};
449///
450/// # let view = SofViewDefinition::R4(helios_fhir::r4::ViewDefinition::default());
451/// # let bundle = SofBundle::R4(helios_fhir::r4::Bundle::default());
452/// # let content_type = ContentType::Json;
453/// match run_view_definition(view, bundle, content_type) {
454///     Ok(output) => {
455///         println!("Transformation successful");
456///     },
457///     Err(SofError::InvalidViewDefinition(msg)) => {
458///         eprintln!("ViewDefinition validation failed: {}", msg);
459///     },
460///     Err(SofError::FhirPathError(msg)) => {
461///         eprintln!("FHIRPath evaluation error: {}", msg);
462///     },
463///     Err(SofError::UnsupportedContentType(format)) => {
464///         eprintln!("Unsupported output format: {}", format);
465///     },
466///     Err(e) => {
467///         eprintln!("Other error: {}", e);
468///     }
469/// }
470/// ```
471#[derive(Debug, Error)]
472pub enum SofError {
473    /// ViewDefinition structure or logic validation failed.
474    ///
475    /// This error occurs when a ViewDefinition contains invalid or inconsistent
476    /// configuration, such as missing required fields, invalid FHIRPath expressions,
477    /// or incompatible select/unionAll structures.
478    #[error("Invalid ViewDefinition: {0}")]
479    InvalidViewDefinition(String),
480
481    /// FHIRPath expression evaluation failed.
482    ///
483    /// This error occurs when a FHIRPath expression in a ViewDefinition cannot
484    /// be evaluated, either due to syntax errors or runtime evaluation issues.
485    #[error("FHIRPath evaluation error: {0}")]
486    FhirPathError(String),
487
488    /// JSON serialization/deserialization failed.
489    ///
490    /// This error occurs when parsing input JSON or serializing output data fails,
491    /// typically due to malformed JSON or incompatible data structures.
492    #[error("Serialization error: {0}")]
493    SerializationError(#[from] serde_json::Error),
494
495    /// CSV processing failed.
496    ///
497    /// This error occurs during CSV output generation, such as when writing
498    /// headers or data rows to the CSV format.
499    #[error("CSV error: {0}")]
500    CsvError(#[from] csv::Error),
501
502    /// File I/O operation failed.
503    ///
504    /// This error occurs when reading input files or writing output files fails,
505    /// typically due to permission issues or missing files.
506    #[error("IO error: {0}")]
507    IoError(#[from] std::io::Error),
508
509    /// Unsupported output content type requested.
510    ///
511    /// This error occurs when an invalid or unimplemented content type is
512    /// specified for output formatting.
513    #[error("Unsupported content type: {0}")]
514    UnsupportedContentType(String),
515
516    /// CSV writer internal error.
517    ///
518    /// This error occurs when the CSV writer encounters an internal issue
519    /// that prevents successful output generation.
520    #[error("CSV writer error: {0}")]
521    CsvWriterError(String),
522
523    /// Invalid source parameter value.
524    ///
525    /// This error occurs when the source parameter contains an invalid URL or path.
526    #[error("Invalid source: {0}")]
527    InvalidSource(String),
528
529    /// Source not found.
530    ///
531    /// This error occurs when the specified source file or URL cannot be found.
532    #[error("Source not found: {0}")]
533    SourceNotFound(String),
534
535    /// Failed to fetch data from source.
536    ///
537    /// This error occurs when fetching data from a remote source fails.
538    #[error("Failed to fetch source: {0}")]
539    SourceFetchError(String),
540
541    /// Failed to read source data.
542    ///
543    /// This error occurs when reading data from the source fails.
544    #[error("Failed to read source: {0}")]
545    SourceReadError(String),
546
547    /// Invalid content in source.
548    ///
549    /// This error occurs when the source content is not valid FHIR data.
550    #[error("Invalid source content: {0}")]
551    InvalidSourceContent(String),
552
553    /// Unsupported source protocol.
554    ///
555    /// This error occurs when the source URL uses an unsupported protocol.
556    #[error("Unsupported source protocol: {0}")]
557    UnsupportedSourceProtocol(String),
558
559    /// Parquet conversion error.
560    ///
561    /// This error occurs when converting data to Parquet format fails.
562    #[error("Parquet conversion error: {0}")]
563    ParquetConversionError(String),
564}
565
566/// Supported output content types for ViewDefinition transformations.
567///
568/// This enum defines the available output formats for transformed FHIR data.
569/// Each format has specific characteristics and use cases for different
570/// integration scenarios.
571///
572/// # Format Descriptions
573///
574/// - **CSV**: Comma-separated values without headers
575/// - **CSV with Headers**: Comma-separated values with column headers
576/// - **JSON**: Pretty-printed JSON array of objects
577/// - **NDJSON**: Newline-delimited JSON (one object per line)
578/// - **Parquet**: Apache Parquet columnar format (planned)
579///
580/// # Examples
581///
582/// ```rust
583/// use helios_sof::ContentType;
584///
585/// // Parse from string
586/// let csv_type = ContentType::from_string("text/csv")?;
587/// assert_eq!(csv_type, ContentType::CsvWithHeader);  // Default includes headers
588///
589/// let json_type = ContentType::from_string("application/json")?;
590/// assert_eq!(json_type, ContentType::Json);
591///
592/// // CSV without headers
593/// let csv_no_headers = ContentType::from_string("text/csv;header=false")?;
594/// assert_eq!(csv_no_headers, ContentType::Csv);
595/// # Ok::<(), helios_sof::SofError>(())
596/// ```
597#[derive(Debug, Clone, Copy, PartialEq, Eq)]
598pub enum ContentType {
599    /// Comma-separated values format without headers
600    Csv,
601    /// Comma-separated values format with column headers
602    CsvWithHeader,
603    /// Pretty-printed JSON array format
604    Json,
605    /// Newline-delimited JSON format (NDJSON)
606    NdJson,
607    /// Apache Parquet columnar format (not yet implemented)
608    Parquet,
609}
610
611impl ContentType {
612    /// Parse a content type from its MIME type string representation.
613    ///
614    /// This method converts standard MIME type strings to the corresponding
615    /// ContentType enum variants. It supports the SQL-on-FHIR specification's
616    /// recommended content types.
617    ///
618    /// # Supported MIME Types
619    ///
620    /// - `"text/csv"` → [`ContentType::Csv`]
621    /// - `"text/csv"` → [`ContentType::CsvWithHeader`] (default: headers included)
622    /// - `"text/csv;header=true"` → [`ContentType::CsvWithHeader`]
623    /// - `"text/csv;header=false"` → [`ContentType::Csv`]
624    /// - `"application/json"` → [`ContentType::Json`]
625    /// - `"application/ndjson"` → [`ContentType::NdJson`]
626    /// - `"application/x-ndjson"` → [`ContentType::NdJson`]
627    /// - `"application/parquet"` → [`ContentType::Parquet`]
628    ///
629    /// # Arguments
630    ///
631    /// * `s` - The MIME type string to parse
632    ///
633    /// # Returns
634    ///
635    /// * `Ok(ContentType)` - Successfully parsed content type
636    /// * `Err(SofError::UnsupportedContentType)` - Unknown or unsupported MIME type
637    ///
638    /// # Examples
639    ///
640    /// ```rust
641    /// use helios_sof::ContentType;
642    ///
643    /// // Shortened format names
644    /// let csv = ContentType::from_string("csv")?;
645    /// assert_eq!(csv, ContentType::CsvWithHeader);
646    ///
647    /// let json = ContentType::from_string("json")?;
648    /// assert_eq!(json, ContentType::Json);
649    ///
650    /// let ndjson = ContentType::from_string("ndjson")?;
651    /// assert_eq!(ndjson, ContentType::NdJson);
652    ///
653    /// // Full MIME types still supported
654    /// let csv_mime = ContentType::from_string("text/csv")?;
655    /// assert_eq!(csv_mime, ContentType::CsvWithHeader);
656    ///
657    /// // CSV with headers explicitly
658    /// let csv_headers = ContentType::from_string("text/csv;header=true")?;
659    /// assert_eq!(csv_headers, ContentType::CsvWithHeader);
660    ///
661    /// // CSV without headers
662    /// let csv_no_headers = ContentType::from_string("text/csv;header=false")?;
663    /// assert_eq!(csv_no_headers, ContentType::Csv);
664    ///
665    /// // JSON format
666    /// let json_mime = ContentType::from_string("application/json")?;
667    /// assert_eq!(json_mime, ContentType::Json);
668    ///
669    /// // Error for unsupported type
670    /// assert!(ContentType::from_string("text/plain").is_err());
671    /// # Ok::<(), helios_sof::SofError>(())
672    /// ```
673    pub fn from_string(s: &str) -> Result<Self, SofError> {
674        match s {
675            // Shortened format names
676            "csv" => Ok(ContentType::CsvWithHeader),
677            "json" => Ok(ContentType::Json),
678            "ndjson" => Ok(ContentType::NdJson),
679            "parquet" => Ok(ContentType::Parquet),
680            // Full MIME types (for Accept header compatibility)
681            "text/csv;header=false" => Ok(ContentType::Csv),
682            "text/csv" | "text/csv;header=true" => Ok(ContentType::CsvWithHeader),
683            "application/json" => Ok(ContentType::Json),
684            "application/ndjson" | "application/x-ndjson" => Ok(ContentType::NdJson),
685            "application/parquet" => Ok(ContentType::Parquet),
686            _ => Err(SofError::UnsupportedContentType(s.to_string())),
687        }
688    }
689}
690
691/// Returns the FHIR version string for the newest enabled version.
692///
693/// This function provides the version string that should be used in CapabilityStatements
694/// and other FHIR resources that need to specify their version.
695pub fn get_fhir_version_string() -> &'static str {
696    let newest_version = get_newest_enabled_fhir_version();
697
698    match newest_version {
699        #[cfg(feature = "R4")]
700        helios_fhir::FhirVersion::R4 => "4.0.1",
701        #[cfg(feature = "R4B")]
702        helios_fhir::FhirVersion::R4B => "4.3.0",
703        #[cfg(feature = "R5")]
704        helios_fhir::FhirVersion::R5 => "5.0.0",
705        #[cfg(feature = "R6")]
706        helios_fhir::FhirVersion::R6 => "6.0.0",
707    }
708}
709
710/// Returns the newest FHIR version that is enabled at compile time.
711///
712/// This function uses compile-time feature detection to determine which FHIR
713/// version should be used when multiple versions are enabled. The priority order
714/// is: R6 > R5 > R4B > R4, where newer versions take precedence.
715///
716/// # Examples
717///
718/// ```rust
719/// use helios_sof::{get_newest_enabled_fhir_version, FhirVersion};
720///
721/// # #[cfg(any(feature = "R4", feature = "R4B", feature = "R5", feature = "R6"))]
722/// # {
723/// let version = get_newest_enabled_fhir_version();
724/// // If R5 and R4 are both enabled, this returns R5
725/// # }
726/// ```
727///
728/// # Panics
729///
730/// This function will panic at compile time if no FHIR version features are enabled.
731pub fn get_newest_enabled_fhir_version() -> helios_fhir::FhirVersion {
732    #[cfg(feature = "R6")]
733    return helios_fhir::FhirVersion::R6;
734
735    #[cfg(all(feature = "R5", not(feature = "R6")))]
736    return helios_fhir::FhirVersion::R5;
737
738    #[cfg(all(feature = "R4B", not(feature = "R5"), not(feature = "R6")))]
739    return helios_fhir::FhirVersion::R4B;
740
741    #[cfg(all(
742        feature = "R4",
743        not(feature = "R4B"),
744        not(feature = "R5"),
745        not(feature = "R6")
746    ))]
747    return helios_fhir::FhirVersion::R4;
748
749    #[cfg(not(any(feature = "R4", feature = "R4B", feature = "R5", feature = "R6")))]
750    panic!("At least one FHIR version feature must be enabled");
751}
752
753/// A single row of processed tabular data from ViewDefinition transformation.
754///
755/// This struct represents one row in the output table, containing values for
756/// each column defined in the ViewDefinition. Values are stored as optional
757/// JSON values to handle nullable fields and diverse FHIR data types.
758///
759/// # Structure
760///
761/// Each `ProcessedRow` contains a vector of optional JSON values, where:
762/// - `Some(value)` represents a non-null column value
763/// - `None` represents a null/missing column value
764/// - The order matches the column order in [`ProcessedResult::columns`]
765///
766/// # Examples
767///
768/// ```rust
769/// use helios_sof::ProcessedRow;
770/// use serde_json::Value;
771///
772/// let row = ProcessedRow {
773///     values: vec![
774///         Some(Value::String("patient-123".to_string())),
775///         Some(Value::String("Doe".to_string())),
776///         None, // Missing birth date
777///         Some(Value::Bool(true)),
778///     ]
779/// };
780/// ```
781#[derive(Debug, Clone, Serialize, Deserialize)]
782pub struct ProcessedRow {
783    /// Column values for this row, ordered according to ProcessedResult::columns
784    pub values: Vec<Option<serde_json::Value>>,
785}
786
787/// Complete result of ViewDefinition transformation containing columns and data rows.
788///
789/// This struct represents the tabular output from processing a ViewDefinition
790/// against a Bundle of FHIR resources. It contains both the column definitions
791/// and the actual data rows in a format ready for serialization to various
792/// output formats.
793///
794/// # Structure
795///
796/// - [`columns`](Self::columns): Ordered list of column names from the ViewDefinition
797/// - [`rows`](Self::rows): Data rows where each row contains values in column order
798///
799/// # Examples
800///
801/// ```rust
802/// use helios_sof::{ProcessedResult, ProcessedRow};
803/// use serde_json::Value;
804///
805/// let result = ProcessedResult {
806///     columns: vec![
807///         "patient_id".to_string(),
808///         "family_name".to_string(),
809///         "given_name".to_string(),
810///     ],
811///     rows: vec![
812///         ProcessedRow {
813///             values: vec![
814///                 Some(Value::String("patient-1".to_string())),
815///                 Some(Value::String("Smith".to_string())),
816///                 Some(Value::String("John".to_string())),
817///             ]
818///         },
819///         ProcessedRow {
820///             values: vec![
821///                 Some(Value::String("patient-2".to_string())),
822///                 Some(Value::String("Doe".to_string())),
823///                 None, // Missing given name
824///             ]
825///         },
826///     ]
827/// };
828///
829/// assert_eq!(result.columns.len(), 3);
830/// assert_eq!(result.rows.len(), 2);
831/// ```
832#[derive(Debug, Clone, Serialize, Deserialize)]
833pub struct ProcessedResult {
834    /// Ordered list of column names as defined in the ViewDefinition
835    pub columns: Vec<String>,
836    /// Data rows containing values for each column
837    pub rows: Vec<ProcessedRow>,
838}
839
840/// Execute a SQL-on-FHIR ViewDefinition transformation on a FHIR Bundle.
841///
842/// This is the main entry point for SQL-on-FHIR transformations. It processes
843/// a ViewDefinition against a Bundle of FHIR resources and produces output in
844/// the specified format. The function handles version compatibility, validation,
845/// FHIRPath evaluation, and output formatting.
846///
847/// # Arguments
848///
849/// * `view_definition` - The ViewDefinition containing transformation logic
850/// * `bundle` - The Bundle containing FHIR resources to process
851/// * `content_type` - The desired output format
852///
853/// # Returns
854///
855/// * `Ok(Vec<u8>)` - Formatted output bytes ready for writing to file or stdout
856/// * `Err(SofError)` - Detailed error information about what went wrong
857///
858/// # Validation
859///
860/// The function performs comprehensive validation:
861/// - FHIR version compatibility between ViewDefinition and Bundle
862/// - ViewDefinition structure and logic validation
863/// - FHIRPath expression syntax and evaluation
864/// - Output format compatibility
865///
866/// # Examples
867///
868/// ```rust
869/// use helios_sof::{SofViewDefinition, SofBundle, ContentType, run_view_definition};
870///
871/// # #[cfg(feature = "R4")]
872/// # {
873/// // Create a simple ViewDefinition
874/// let view_json = serde_json::json!({
875///     "resourceType": "ViewDefinition",
876///     "status": "active",
877///     "resource": "Patient",
878///     "select": [{
879///         "column": [{
880///             "name": "id",
881///             "path": "id"
882///         }]
883///     }]
884/// });
885/// let view_def: helios_fhir::r4::ViewDefinition = serde_json::from_value(view_json)?;
886///
887/// // Create a simple Bundle
888/// let bundle_json = serde_json::json!({
889///     "resourceType": "Bundle",
890///     "type": "collection",
891///     "entry": []
892/// });
893/// let bundle: helios_fhir::r4::Bundle = serde_json::from_value(bundle_json)?;
894///
895/// let sof_view = SofViewDefinition::R4(view_def);
896/// let sof_bundle = SofBundle::R4(bundle);
897///
898/// // Generate CSV with headers
899/// let csv_output = run_view_definition(
900///     sof_view,
901///     sof_bundle,
902///     ContentType::CsvWithHeader
903/// )?;
904///
905/// // Write to file or stdout
906/// std::fs::write("output.csv", csv_output)?;
907/// # }
908/// # Ok::<(), Box<dyn std::error::Error>>(())
909/// ```
910///
911/// # Error Handling
912///
913/// Common error scenarios:
914///
915/// ```rust,no_run
916/// use helios_sof::{SofError, SofViewDefinition, SofBundle, ContentType, run_view_definition};
917///
918/// # let view = SofViewDefinition::R4(helios_fhir::r4::ViewDefinition::default());
919/// # let bundle = SofBundle::R4(helios_fhir::r4::Bundle::default());
920/// # let content_type = ContentType::Json;
921/// match run_view_definition(view, bundle, content_type) {
922///     Ok(output) => {
923///         println!("Success: {} bytes generated", output.len());
924///     },
925///     Err(SofError::InvalidViewDefinition(msg)) => {
926///         eprintln!("ViewDefinition error: {}", msg);
927///     },
928///     Err(SofError::FhirPathError(msg)) => {
929///         eprintln!("FHIRPath error: {}", msg);
930///     },
931///     Err(e) => {
932///         eprintln!("Other error: {}", e);
933///     }
934/// }
935/// ```
936pub fn run_view_definition(
937    view_definition: SofViewDefinition,
938    bundle: SofBundle,
939    content_type: ContentType,
940) -> Result<Vec<u8>, SofError> {
941    run_view_definition_with_options(view_definition, bundle, content_type, RunOptions::default())
942}
943
944/// Configuration options for Parquet file generation.
945#[derive(Debug, Clone)]
946pub struct ParquetOptions {
947    /// Target row group size in MB (64-1024)
948    pub row_group_size_mb: u32,
949    /// Target page size in KB (64-8192)
950    pub page_size_kb: u32,
951    /// Compression algorithm (none, snappy, gzip, lz4, brotli, zstd)
952    pub compression: String,
953    /// Maximum file size in MB (splits output when exceeded)
954    pub max_file_size_mb: Option<u32>,
955}
956
957impl Default for ParquetOptions {
958    fn default() -> Self {
959        Self {
960            row_group_size_mb: 256,
961            page_size_kb: 1024,
962            compression: "snappy".to_string(),
963            max_file_size_mb: None,
964        }
965    }
966}
967
968/// Options for filtering and controlling ViewDefinition execution
969#[derive(Debug, Clone, Default)]
970pub struct RunOptions {
971    /// Filter resources modified after this time
972    pub since: Option<DateTime<Utc>>,
973    /// Limit the number of results
974    pub limit: Option<usize>,
975    /// Page number for pagination (1-based)
976    pub page: Option<usize>,
977    /// Parquet-specific configuration options
978    pub parquet_options: Option<ParquetOptions>,
979}
980
981// =============================================================================
982// Streaming/Chunked Processing Types
983// =============================================================================
984
985/// Configuration for chunked NDJSON processing.
986///
987/// Controls how NDJSON files are read and processed in chunks to reduce
988/// memory usage when handling large files.
989///
990/// # Examples
991///
992/// ```rust
993/// use helios_sof::ChunkConfig;
994///
995/// // Default configuration (1000 resources per chunk)
996/// let config = ChunkConfig::default();
997///
998/// // Custom configuration for memory-constrained environments
999/// let config = ChunkConfig {
1000///     chunk_size: 100,
1001///     skip_invalid_lines: true,
1002/// };
1003/// ```
1004#[derive(Debug, Clone)]
1005pub struct ChunkConfig {
1006    /// Number of resources to process per chunk.
1007    /// Default: 1000 (approximately 10MB memory usage per chunk)
1008    pub chunk_size: usize,
1009    /// If true, skip lines that fail to parse as valid JSON.
1010    /// If false (default), return an error on the first invalid line.
1011    pub skip_invalid_lines: bool,
1012}
1013
1014impl Default for ChunkConfig {
1015    fn default() -> Self {
1016        Self {
1017            chunk_size: 1000,
1018            skip_invalid_lines: false,
1019        }
1020    }
1021}
1022
1023/// A chunk of parsed FHIR resources from an NDJSON file.
1024///
1025/// Represents a batch of resources that have been read and parsed,
1026/// ready for processing through a ViewDefinition.
1027#[derive(Debug)]
1028pub struct ResourceChunk {
1029    /// The parsed FHIR resources in this chunk
1030    pub resources: Vec<serde_json::Value>,
1031    /// Zero-based index of this chunk (0, 1, 2, ...)
1032    pub chunk_index: usize,
1033    /// True if this is the last chunk in the file
1034    pub is_last: bool,
1035}
1036
1037/// Result from processing a single chunk of resources.
1038///
1039/// Contains the output rows generated from processing one chunk,
1040/// along with metadata about the chunk position.
1041#[derive(Debug, Clone)]
1042pub struct ChunkedResult {
1043    /// Column names (same for all chunks)
1044    pub columns: Vec<String>,
1045    /// Processed rows from this chunk
1046    pub rows: Vec<ProcessedRow>,
1047    /// Zero-based index of this chunk
1048    pub chunk_index: usize,
1049    /// True if this is the last chunk
1050    pub is_last: bool,
1051    /// Number of resources that were in the input chunk
1052    pub resources_in_chunk: usize,
1053}
1054
1055/// Statistics from chunked processing.
1056///
1057/// Provides summary information about a completed chunked processing run.
1058#[derive(Debug, Clone, Default)]
1059pub struct ProcessingStats {
1060    /// Total number of lines read from the NDJSON file
1061    pub total_lines_read: usize,
1062    /// Number of FHIR resources successfully processed
1063    pub resources_processed: usize,
1064    /// Number of output rows generated
1065    pub output_rows: usize,
1066    /// Number of lines skipped due to parse errors (when skip_invalid_lines is true)
1067    pub skipped_lines: usize,
1068    /// Number of chunks processed
1069    pub chunks_processed: usize,
1070}
1071
1072/// Reads NDJSON files in chunks, yielding parsed resources.
1073///
1074/// This iterator reads an NDJSON file line by line, collecting resources
1075/// into chunks of the configured size. Each iteration yields a `ResourceChunk`
1076/// containing up to `chunk_size` parsed FHIR resources.
1077///
1078/// # Examples
1079///
1080/// ```rust,no_run
1081/// use helios_sof::{NdjsonChunkReader, ChunkConfig};
1082/// use std::io::BufReader;
1083/// use std::fs::File;
1084///
1085/// let file = File::open("patients.ndjson").unwrap();
1086/// let reader = BufReader::new(file);
1087/// let config = ChunkConfig::default();
1088///
1089/// let mut chunk_reader = NdjsonChunkReader::new(reader, config);
1090///
1091/// while let Some(result) = chunk_reader.next() {
1092///     match result {
1093///         Ok(chunk) => {
1094///             println!("Chunk {}: {} resources", chunk.chunk_index, chunk.resources.len());
1095///         }
1096///         Err(e) => {
1097///             eprintln!("Error reading chunk: {}", e);
1098///             break;
1099///         }
1100///     }
1101/// }
1102/// ```
1103pub struct NdjsonChunkReader<R: BufRead> {
1104    reader: R,
1105    config: ChunkConfig,
1106    current_chunk: usize,
1107    finished: bool,
1108    line_buffer: String,
1109    line_number: usize,
1110    /// Resource type filter - only include resources of this type
1111    resource_type_filter: Option<String>,
1112    /// Number of lines skipped due to invalid JSON
1113    skipped_lines: usize,
1114}
1115
1116impl<R: BufRead> NdjsonChunkReader<R> {
1117    /// Create a new NDJSON chunk reader with the given configuration.
1118    pub fn new(reader: R, config: ChunkConfig) -> Self {
1119        Self {
1120            reader,
1121            config,
1122            current_chunk: 0,
1123            finished: false,
1124            line_buffer: String::new(),
1125            line_number: 0,
1126            resource_type_filter: None,
1127            skipped_lines: 0,
1128        }
1129    }
1130
1131    /// Set a resource type filter to only include resources of a specific type.
1132    ///
1133    /// This is useful when processing NDJSON files that contain multiple resource types.
1134    pub fn with_resource_type_filter(mut self, resource_type: Option<String>) -> Self {
1135        self.resource_type_filter = resource_type;
1136        self
1137    }
1138
1139    /// Get the total number of lines read so far.
1140    pub fn lines_read(&self) -> usize {
1141        self.line_number
1142    }
1143
1144    /// Get the number of lines skipped due to invalid JSON.
1145    pub fn skipped_lines(&self) -> usize {
1146        self.skipped_lines
1147    }
1148}
1149
1150impl<R: BufRead> Iterator for NdjsonChunkReader<R> {
1151    type Item = Result<ResourceChunk, SofError>;
1152
1153    fn next(&mut self) -> Option<Self::Item> {
1154        if self.finished {
1155            return None;
1156        }
1157
1158        let mut resources = Vec::with_capacity(self.config.chunk_size);
1159
1160        while resources.len() < self.config.chunk_size {
1161            self.line_buffer.clear();
1162            match self.reader.read_line(&mut self.line_buffer) {
1163                Ok(0) => {
1164                    // EOF reached
1165                    self.finished = true;
1166                    break;
1167                }
1168                Ok(_) => {
1169                    self.line_number += 1;
1170                    let line = self.line_buffer.trim();
1171
1172                    // Skip empty lines
1173                    if line.is_empty() {
1174                        continue;
1175                    }
1176
1177                    // Parse the JSON
1178                    match serde_json::from_str::<serde_json::Value>(line) {
1179                        Ok(value) => {
1180                            // Apply resource type filter if set
1181                            if let Some(ref filter) = self.resource_type_filter {
1182                                let resource_type =
1183                                    value.get("resourceType").and_then(|v| v.as_str());
1184                                if resource_type != Some(filter.as_str()) {
1185                                    continue;
1186                                }
1187                            }
1188                            resources.push(value);
1189                        }
1190                        Err(e) => {
1191                            if self.config.skip_invalid_lines {
1192                                // Skip this line and continue
1193                                self.skipped_lines += 1;
1194                                continue;
1195                            } else {
1196                                return Some(Err(SofError::InvalidSourceContent(format!(
1197                                    "Invalid JSON at line {}: {}",
1198                                    self.line_number, e
1199                                ))));
1200                            }
1201                        }
1202                    }
1203                }
1204                Err(e) => {
1205                    return Some(Err(SofError::IoError(e)));
1206                }
1207            }
1208        }
1209
1210        // If we have no resources and we're finished, don't return an empty chunk
1211        if resources.is_empty() && self.finished {
1212            return None;
1213        }
1214
1215        let chunk = ResourceChunk {
1216            resources,
1217            chunk_index: self.current_chunk,
1218            is_last: self.finished,
1219        };
1220        self.current_chunk += 1;
1221
1222        Some(Ok(chunk))
1223    }
1224}
1225
1226/// Pre-validated ViewDefinition for efficient reuse across multiple chunks.
1227///
1228/// This struct caches the validation and constant extraction from a ViewDefinition,
1229/// allowing efficient processing of multiple chunks without re-validating each time.
1230///
1231/// # Examples
1232///
1233/// ```rust,no_run
1234/// use helios_sof::{PreparedViewDefinition, SofViewDefinition, ResourceChunk};
1235///
1236/// # #[cfg(feature = "R4")]
1237/// # {
1238/// // Parse and prepare ViewDefinition once
1239/// let view_json: serde_json::Value = serde_json::from_str(r#"{
1240///     "resourceType": "ViewDefinition",
1241///     "resource": "Patient",
1242///     "select": [{"column": [{"name": "id", "path": "id"}]}]
1243/// }"#).unwrap();
1244/// let view_def: helios_fhir::r4::ViewDefinition = serde_json::from_value(view_json).unwrap();
1245/// let sof_view = SofViewDefinition::R4(view_def);
1246///
1247/// let prepared = PreparedViewDefinition::new(sof_view).unwrap();
1248///
1249/// // Process multiple chunks efficiently
1250/// // for chunk in chunk_iterator {
1251/// //     let result = prepared.process_chunk(chunk)?;
1252/// //     // ... handle result
1253/// // }
1254/// # }
1255/// ```
1256#[derive(Debug, Clone)]
1257pub struct PreparedViewDefinition {
1258    view_definition: SofViewDefinition,
1259    target_resource_type: String,
1260    variables: HashMap<String, EvaluationResult>,
1261    column_names: Vec<String>,
1262}
1263
1264impl PreparedViewDefinition {
1265    /// Create a new PreparedViewDefinition by validating and extracting metadata.
1266    ///
1267    /// This performs all validation upfront so that chunk processing is efficient.
1268    pub fn new(view_definition: SofViewDefinition) -> Result<Self, SofError> {
1269        // Extract target resource type and column names based on version
1270        let (target_resource_type, variables, column_names) = match &view_definition {
1271            #[cfg(feature = "R4")]
1272            SofViewDefinition::R4(vd) => {
1273                validate_view_definition(vd)?;
1274                let vars = extract_view_definition_constants(vd)?;
1275                let resource_type = vd
1276                    .resource()
1277                    .ok_or_else(|| {
1278                        SofError::InvalidViewDefinition("Resource type is required".to_string())
1279                    })?
1280                    .to_string();
1281                let mut columns = Vec::new();
1282                if let Some(selects) = vd.select() {
1283                    collect_all_columns(selects, &mut columns)?;
1284                }
1285                (resource_type, vars, columns)
1286            }
1287            #[cfg(feature = "R4B")]
1288            SofViewDefinition::R4B(vd) => {
1289                validate_view_definition(vd)?;
1290                let vars = extract_view_definition_constants(vd)?;
1291                let resource_type = vd
1292                    .resource()
1293                    .ok_or_else(|| {
1294                        SofError::InvalidViewDefinition("Resource type is required".to_string())
1295                    })?
1296                    .to_string();
1297                let mut columns = Vec::new();
1298                if let Some(selects) = vd.select() {
1299                    collect_all_columns(selects, &mut columns)?;
1300                }
1301                (resource_type, vars, columns)
1302            }
1303            #[cfg(feature = "R5")]
1304            SofViewDefinition::R5(vd) => {
1305                validate_view_definition(vd)?;
1306                let vars = extract_view_definition_constants(vd)?;
1307                let resource_type = vd
1308                    .resource()
1309                    .ok_or_else(|| {
1310                        SofError::InvalidViewDefinition("Resource type is required".to_string())
1311                    })?
1312                    .to_string();
1313                let mut columns = Vec::new();
1314                if let Some(selects) = vd.select() {
1315                    collect_all_columns(selects, &mut columns)?;
1316                }
1317                (resource_type, vars, columns)
1318            }
1319            #[cfg(feature = "R6")]
1320            SofViewDefinition::R6(vd) => {
1321                validate_view_definition(vd)?;
1322                let vars = extract_view_definition_constants(vd)?;
1323                let resource_type = vd
1324                    .resource()
1325                    .ok_or_else(|| {
1326                        SofError::InvalidViewDefinition("Resource type is required".to_string())
1327                    })?
1328                    .to_string();
1329                let mut columns = Vec::new();
1330                if let Some(selects) = vd.select() {
1331                    collect_all_columns(selects, &mut columns)?;
1332                }
1333                (resource_type, vars, columns)
1334            }
1335        };
1336
1337        Ok(Self {
1338            view_definition,
1339            target_resource_type,
1340            variables,
1341            column_names,
1342        })
1343    }
1344
1345    /// Get the column names that will be produced by this ViewDefinition.
1346    pub fn columns(&self) -> &[String] {
1347        &self.column_names
1348    }
1349
1350    /// Get the target resource type for this ViewDefinition.
1351    pub fn target_resource_type(&self) -> &str {
1352        &self.target_resource_type
1353    }
1354
1355    /// Process a chunk of resources through this ViewDefinition.
1356    ///
1357    /// Returns a `ChunkedResult` containing the rows generated from the chunk.
1358    /// Uses parallel processing via rayon for improved throughput.
1359    pub fn process_chunk(&self, chunk: ResourceChunk) -> Result<ChunkedResult, SofError> {
1360        // Process resources in parallel using rayon
1361        let results: Result<Vec<Vec<ProcessedRow>>, SofError> = chunk
1362            .resources
1363            .par_iter()
1364            .filter_map(|resource_json| {
1365                // Check resource type matches
1366                let resource_type = resource_json
1367                    .get("resourceType")
1368                    .and_then(|v| v.as_str())
1369                    .unwrap_or("");
1370
1371                if resource_type != self.target_resource_type {
1372                    None
1373                } else {
1374                    // Process single resource based on version
1375                    Some(self.process_single_resource(resource_json))
1376                }
1377            })
1378            .collect();
1379
1380        // Flatten results from all resources
1381        let all_rows: Vec<ProcessedRow> = results?.into_iter().flatten().collect();
1382
1383        Ok(ChunkedResult {
1384            columns: self.column_names.clone(),
1385            rows: all_rows,
1386            chunk_index: chunk.chunk_index,
1387            is_last: chunk.is_last,
1388            resources_in_chunk: chunk.resources.len(),
1389        })
1390    }
1391
1392    /// Process a single resource JSON value through the ViewDefinition.
1393    fn process_single_resource(
1394        &self,
1395        resource_json: &serde_json::Value,
1396    ) -> Result<Vec<ProcessedRow>, SofError> {
1397        match &self.view_definition {
1398            #[cfg(feature = "R4")]
1399            SofViewDefinition::R4(vd) => self.process_single_resource_generic(vd, resource_json),
1400            #[cfg(feature = "R4B")]
1401            SofViewDefinition::R4B(vd) => self.process_single_resource_generic(vd, resource_json),
1402            #[cfg(feature = "R5")]
1403            SofViewDefinition::R5(vd) => self.process_single_resource_generic(vd, resource_json),
1404            #[cfg(feature = "R6")]
1405            SofViewDefinition::R6(vd) => self.process_single_resource_generic(vd, resource_json),
1406        }
1407    }
1408
1409    fn process_single_resource_generic<VD>(
1410        &self,
1411        view_definition: &VD,
1412        resource_json: &serde_json::Value,
1413    ) -> Result<Vec<ProcessedRow>, SofError>
1414    where
1415        VD: ViewDefinitionTrait,
1416        VD::Select: ViewDefinitionSelectTrait,
1417    {
1418        // Create evaluation context from JSON by parsing into typed FhirResource
1419        let fhir_resource =
1420            parse_json_to_fhir_resource(resource_json.clone(), self.view_definition.version())?;
1421        let mut context = EvaluationContext::new(vec![fhir_resource]);
1422
1423        // Add variables to the context
1424        for (name, value) in &self.variables {
1425            context.set_variable_result(name, value.clone());
1426        }
1427
1428        // Apply where clauses
1429        if let Some(where_clauses) = view_definition.where_clauses() {
1430            for where_clause in where_clauses {
1431                let path = where_clause.path().ok_or_else(|| {
1432                    SofError::InvalidViewDefinition("Where clause path is required".to_string())
1433                })?;
1434
1435                match evaluate_expression(path, &context) {
1436                    Ok(result) => {
1437                        if !can_be_coerced_to_boolean(&result) {
1438                            return Err(SofError::InvalidViewDefinition(format!(
1439                                "Where clause path '{}' returns type '{}' which cannot be used as a boolean condition.",
1440                                path,
1441                                result.type_name()
1442                            )));
1443                        }
1444                        if !is_truthy(&result) {
1445                            // Resource doesn't match where clause, return empty rows
1446                            return Ok(Vec::new());
1447                        }
1448                    }
1449                    Err(e) => {
1450                        return Err(SofError::FhirPathError(format!(
1451                            "Error evaluating where clause '{}': {}",
1452                            path, e
1453                        )));
1454                    }
1455                }
1456            }
1457        }
1458
1459        // Generate rows
1460        let select_clauses = view_definition.select().ok_or_else(|| {
1461            SofError::InvalidViewDefinition("At least one select clause is required".to_string())
1462        })?;
1463
1464        let mut all_columns = self.column_names.clone();
1465        generate_row_combinations(&context, select_clauses, &mut all_columns, &self.variables)
1466    }
1467}
1468
1469/// Iterator that combines NDJSON reading with ViewDefinition processing.
1470///
1471/// This iterator reads chunks from an NDJSON file and processes them
1472/// through a ViewDefinition, yielding `ChunkedResult` for each chunk.
1473///
1474/// # Examples
1475///
1476/// ```rust,no_run
1477/// use helios_sof::{NdjsonChunkIterator, SofViewDefinition, ChunkConfig};
1478/// use std::io::BufReader;
1479/// use std::fs::File;
1480///
1481/// # #[cfg(feature = "R4")]
1482/// # {
1483/// // Set up ViewDefinition
1484/// let view_json: serde_json::Value = serde_json::from_str(r#"{
1485///     "resourceType": "ViewDefinition",
1486///     "resource": "Patient",
1487///     "select": [{"column": [{"name": "id", "path": "id"}]}]
1488/// }"#).unwrap();
1489/// let view_def: helios_fhir::r4::ViewDefinition = serde_json::from_value(view_json).unwrap();
1490/// let sof_view = SofViewDefinition::R4(view_def);
1491///
1492/// // Process file in chunks
1493/// let file = File::open("patients.ndjson").unwrap();
1494/// let reader = BufReader::new(file);
1495///
1496/// let iterator = NdjsonChunkIterator::new(sof_view, reader, ChunkConfig::default()).unwrap();
1497///
1498/// for result in iterator {
1499///     match result {
1500///         Ok(chunk_result) => {
1501///             println!("Chunk {}: {} rows", chunk_result.chunk_index, chunk_result.rows.len());
1502///         }
1503///         Err(e) => {
1504///             eprintln!("Error: {}", e);
1505///             break;
1506///         }
1507///     }
1508/// }
1509/// # }
1510/// ```
1511pub struct NdjsonChunkIterator<R: BufRead> {
1512    reader: NdjsonChunkReader<R>,
1513    prepared_vd: PreparedViewDefinition,
1514}
1515
1516impl<R: BufRead> NdjsonChunkIterator<R> {
1517    /// Create a new chunk iterator from a ViewDefinition and NDJSON reader.
1518    pub fn new(
1519        view_definition: SofViewDefinition,
1520        reader: R,
1521        config: ChunkConfig,
1522    ) -> Result<Self, SofError> {
1523        let prepared_vd = PreparedViewDefinition::new(view_definition)?;
1524        let resource_type = prepared_vd.target_resource_type().to_string();
1525        let chunk_reader =
1526            NdjsonChunkReader::new(reader, config).with_resource_type_filter(Some(resource_type));
1527
1528        Ok(Self {
1529            reader: chunk_reader,
1530            prepared_vd,
1531        })
1532    }
1533
1534    /// Get the column names that will be produced by this iterator.
1535    pub fn columns(&self) -> &[String] {
1536        self.prepared_vd.columns()
1537    }
1538
1539    /// Get the total number of lines read so far.
1540    pub fn lines_read(&self) -> usize {
1541        self.reader.lines_read()
1542    }
1543
1544    /// Get the number of lines skipped due to invalid JSON.
1545    pub fn skipped_lines(&self) -> usize {
1546        self.reader.skipped_lines()
1547    }
1548}
1549
1550impl<R: BufRead> Iterator for NdjsonChunkIterator<R> {
1551    type Item = Result<ChunkedResult, SofError>;
1552
1553    fn next(&mut self) -> Option<Self::Item> {
1554        match self.reader.next()? {
1555            Ok(chunk) => Some(self.prepared_vd.process_chunk(chunk)),
1556            Err(e) => Some(Err(e)),
1557        }
1558    }
1559}
1560
1561// =============================================================================
1562// Streaming Output Functions
1563// =============================================================================
1564
1565/// Write CSV header row.
1566fn write_csv_header<W: Write>(columns: &[String], writer: &mut W) -> Result<(), SofError> {
1567    let mut wtr = csv::Writer::from_writer(writer);
1568    wtr.write_record(columns)?;
1569    wtr.flush()?;
1570    Ok(())
1571}
1572
1573/// Write CSV rows from a chunk (no header).
1574fn write_csv_chunk<W: Write>(result: &ChunkedResult, writer: &mut W) -> Result<(), SofError> {
1575    let mut wtr = csv::Writer::from_writer(writer);
1576
1577    for row in &result.rows {
1578        let record: Vec<String> = row
1579            .values
1580            .iter()
1581            .map(|v| match v {
1582                Some(val) => {
1583                    if let serde_json::Value::String(s) = val {
1584                        s.clone()
1585                    } else {
1586                        serde_json::to_string(val).unwrap_or_default()
1587                    }
1588                }
1589                None => String::new(),
1590            })
1591            .collect();
1592        wtr.write_record(&record)?;
1593    }
1594
1595    wtr.flush()?;
1596    Ok(())
1597}
1598
1599/// Write NDJSON rows from a chunk.
1600fn write_ndjson_chunk<W: Write>(result: &ChunkedResult, writer: &mut W) -> Result<(), SofError> {
1601    for row in &result.rows {
1602        let mut row_obj = serde_json::Map::new();
1603        for (i, column) in result.columns.iter().enumerate() {
1604            let value = row
1605                .values
1606                .get(i)
1607                .and_then(|v| v.as_ref())
1608                .cloned()
1609                .unwrap_or(serde_json::Value::Null);
1610            row_obj.insert(column.clone(), value);
1611        }
1612        let line = serde_json::to_string(&serde_json::Value::Object(row_obj))?;
1613        writer.write_all(line.as_bytes())?;
1614        writer.write_all(b"\n")?;
1615    }
1616
1617    Ok(())
1618}
1619
1620/// Process an NDJSON input stream and write output incrementally.
1621///
1622/// This is the main entry point for streaming/chunked NDJSON processing.
1623/// It reads the input in chunks, processes each chunk through the ViewDefinition,
1624/// and writes the output incrementally to the writer.
1625///
1626/// # Arguments
1627///
1628/// * `view_definition` - The ViewDefinition to execute
1629/// * `input` - A buffered reader for the NDJSON input
1630/// * `output` - A writer for the output (file, stdout, etc.)
1631/// * `content_type` - The desired output format (CSV, NDJSON, JSON)
1632/// * `config` - Configuration for chunk processing
1633///
1634/// # Returns
1635///
1636/// Statistics about the processing run, including row counts and chunk counts.
1637///
1638/// # Examples
1639///
1640/// ```rust,no_run
1641/// use helios_sof::{process_ndjson_chunked, SofViewDefinition, ContentType, ChunkConfig};
1642/// use std::io::{BufReader, BufWriter};
1643/// use std::fs::File;
1644///
1645/// # #[cfg(feature = "R4")]
1646/// # {
1647/// // Set up ViewDefinition
1648/// let view_json: serde_json::Value = serde_json::from_str(r#"{
1649///     "resourceType": "ViewDefinition",
1650///     "resource": "Patient",
1651///     "select": [{"column": [{"name": "id", "path": "id"}]}]
1652/// }"#).unwrap();
1653/// let view_def: helios_fhir::r4::ViewDefinition = serde_json::from_value(view_json).unwrap();
1654/// let sof_view = SofViewDefinition::R4(view_def);
1655///
1656/// // Process file
1657/// let input = BufReader::new(File::open("patients.ndjson").unwrap());
1658/// let mut output = BufWriter::new(File::create("output.csv").unwrap());
1659///
1660/// let stats = process_ndjson_chunked(
1661///     sof_view,
1662///     input,
1663///     &mut output,
1664///     ContentType::CsvWithHeader,
1665///     ChunkConfig::default(),
1666/// ).unwrap();
1667///
1668/// println!("Processed {} resources, {} output rows",
1669///     stats.resources_processed, stats.output_rows);
1670/// # }
1671/// ```
1672///
1673/// # Errors
1674///
1675/// Returns an error if:
1676/// - The ViewDefinition is invalid
1677/// - The input contains invalid JSON (when `skip_invalid_lines` is false)
1678/// - Writing to the output fails
1679/// - Parquet format is requested (not supported for streaming)
1680pub fn process_ndjson_chunked<R: BufRead, W: Write>(
1681    view_definition: SofViewDefinition,
1682    input: R,
1683    mut output: W,
1684    content_type: ContentType,
1685    config: ChunkConfig,
1686) -> Result<ProcessingStats, SofError> {
1687    // Validate content type supports streaming
1688    if content_type == ContentType::Parquet {
1689        return Err(SofError::UnsupportedContentType(
1690            "Parquet output is not supported for streaming. Use batch processing instead."
1691                .to_string(),
1692        ));
1693    }
1694
1695    let mut iterator = NdjsonChunkIterator::new(view_definition, input, config)?;
1696    let columns = iterator.columns().to_vec();
1697
1698    let mut stats = ProcessingStats::default();
1699    let mut is_first_chunk = true;
1700
1701    // Write header if needed
1702    if content_type == ContentType::CsvWithHeader {
1703        write_csv_header(&columns, &mut output)?;
1704    }
1705
1706    // For JSON output, we need special handling to create a valid array
1707    if content_type == ContentType::Json {
1708        output.write_all(b"[\n")?;
1709    }
1710
1711    for result in iterator.by_ref() {
1712        let chunk_result = result?;
1713
1714        stats.resources_processed += chunk_result.resources_in_chunk;
1715        stats.output_rows += chunk_result.rows.len();
1716        stats.chunks_processed += 1;
1717
1718        // Write chunk output
1719        match content_type {
1720            ContentType::Csv | ContentType::CsvWithHeader => {
1721                write_csv_chunk(&chunk_result, &mut output)?;
1722            }
1723            ContentType::NdJson => {
1724                write_ndjson_chunk(&chunk_result, &mut output)?;
1725            }
1726            ContentType::Json => {
1727                // Write JSON rows with proper comma handling
1728                for (i, row) in chunk_result.rows.iter().enumerate() {
1729                    if !is_first_chunk || i > 0 {
1730                        output.write_all(b",\n")?;
1731                    }
1732
1733                    let mut row_obj = serde_json::Map::new();
1734                    for (j, column) in chunk_result.columns.iter().enumerate() {
1735                        let value = row
1736                            .values
1737                            .get(j)
1738                            .and_then(|v| v.as_ref())
1739                            .cloned()
1740                            .unwrap_or(serde_json::Value::Null);
1741                        row_obj.insert(column.clone(), value);
1742                    }
1743                    let json = serde_json::to_string_pretty(&serde_json::Value::Object(row_obj))?;
1744                    output.write_all(json.as_bytes())?;
1745                }
1746            }
1747            ContentType::Parquet => unreachable!(), // Already checked above
1748        }
1749
1750        output.flush()?;
1751        is_first_chunk = false;
1752    }
1753
1754    // Close JSON array if needed
1755    if content_type == ContentType::Json {
1756        output.write_all(b"\n]")?;
1757    }
1758
1759    output.flush()?;
1760
1761    // Update stats with line/skip counts from the iterator
1762    stats.total_lines_read = iterator.lines_read();
1763    stats.skipped_lines = iterator.skipped_lines();
1764
1765    Ok(stats)
1766}
1767
1768/// Create an iterator for chunked NDJSON processing.
1769///
1770/// This is a convenience function that creates an `NdjsonChunkIterator`.
1771/// Use this when you want more control over how chunks are processed.
1772///
1773/// # Arguments
1774///
1775/// * `view_definition` - The ViewDefinition to execute
1776/// * `reader` - A buffered reader for the NDJSON input
1777/// * `config` - Configuration for chunk processing
1778///
1779/// # Returns
1780///
1781/// An iterator that yields `ChunkedResult` for each processed chunk.
1782pub fn iter_ndjson_chunks<R: BufRead>(
1783    view_definition: SofViewDefinition,
1784    reader: R,
1785    config: ChunkConfig,
1786) -> Result<NdjsonChunkIterator<R>, SofError> {
1787    NdjsonChunkIterator::new(view_definition, reader, config)
1788}
1789
1790// =============================================================================
1791// End Streaming/Chunked Processing Types
1792// =============================================================================
1793
1794/// Parse a JSON value into a FhirResource for the given FHIR version.
1795///
1796/// This is used internally for streaming/chunked processing where we have
1797/// raw JSON that needs to be converted to typed resources for FHIRPath evaluation.
1798fn parse_json_to_fhir_resource(
1799    json: serde_json::Value,
1800    version: FhirVersion,
1801) -> Result<helios_fhir::FhirResource, SofError> {
1802    match version {
1803        #[cfg(feature = "R4")]
1804        FhirVersion::R4 => {
1805            let resource: helios_fhir::r4::Resource =
1806                serde_json::from_value(json).map_err(|e| {
1807                    SofError::InvalidSourceContent(format!("Invalid R4 resource: {}", e))
1808                })?;
1809            Ok(helios_fhir::FhirResource::R4(Box::new(resource)))
1810        }
1811        #[cfg(feature = "R4B")]
1812        FhirVersion::R4B => {
1813            let resource: helios_fhir::r4b::Resource =
1814                serde_json::from_value(json).map_err(|e| {
1815                    SofError::InvalidSourceContent(format!("Invalid R4B resource: {}", e))
1816                })?;
1817            Ok(helios_fhir::FhirResource::R4B(Box::new(resource)))
1818        }
1819        #[cfg(feature = "R5")]
1820        FhirVersion::R5 => {
1821            let resource: helios_fhir::r5::Resource =
1822                serde_json::from_value(json).map_err(|e| {
1823                    SofError::InvalidSourceContent(format!("Invalid R5 resource: {}", e))
1824                })?;
1825            Ok(helios_fhir::FhirResource::R5(Box::new(resource)))
1826        }
1827        #[cfg(feature = "R6")]
1828        FhirVersion::R6 => {
1829            let resource: helios_fhir::r6::Resource =
1830                serde_json::from_value(json).map_err(|e| {
1831                    SofError::InvalidSourceContent(format!("Invalid R6 resource: {}", e))
1832                })?;
1833            Ok(helios_fhir::FhirResource::R6(Box::new(resource)))
1834        }
1835    }
1836}
1837
1838/// Execute a ViewDefinition transformation with additional filtering options.
1839///
1840/// This function extends the basic `run_view_definition` with support for:
1841/// - Filtering resources by modification time (`since`)
1842/// - Limiting results (`limit`)
1843/// - Pagination (`page`)
1844///
1845/// # Arguments
1846///
1847/// * `view_definition` - The ViewDefinition to execute
1848/// * `bundle` - The Bundle containing resources to transform
1849/// * `content_type` - Desired output format
1850/// * `options` - Additional filtering and control options
1851///
1852/// # Returns
1853///
1854/// The transformed data in the requested format, with filtering applied.
1855pub fn run_view_definition_with_options(
1856    view_definition: SofViewDefinition,
1857    bundle: SofBundle,
1858    content_type: ContentType,
1859    options: RunOptions,
1860) -> Result<Vec<u8>, SofError> {
1861    // Filter bundle resources by since parameter before processing
1862    let filtered_bundle = if let Some(since) = options.since {
1863        filter_bundle_by_since(bundle, since)?
1864    } else {
1865        bundle
1866    };
1867
1868    // Process the ViewDefinition to generate tabular data
1869    let processed_result = process_view_definition(view_definition, filtered_bundle)?;
1870
1871    // Apply pagination if needed
1872    let processed_result = if options.limit.is_some() || options.page.is_some() {
1873        apply_pagination_to_result(processed_result, options.limit, options.page)?
1874    } else {
1875        processed_result
1876    };
1877
1878    // Format the result according to the requested content type
1879    format_output(
1880        processed_result,
1881        content_type,
1882        options.parquet_options.as_ref(),
1883    )
1884}
1885
1886pub fn process_view_definition(
1887    view_definition: SofViewDefinition,
1888    bundle: SofBundle,
1889) -> Result<ProcessedResult, SofError> {
1890    // Ensure both resources use the same FHIR version
1891    if view_definition.version() != bundle.version() {
1892        return Err(SofError::InvalidViewDefinition(
1893            "ViewDefinition and Bundle must use the same FHIR version".to_string(),
1894        ));
1895    }
1896
1897    match (view_definition, bundle) {
1898        #[cfg(feature = "R4")]
1899        (SofViewDefinition::R4(vd), SofBundle::R4(bundle)) => {
1900            process_view_definition_generic(vd, bundle)
1901        }
1902        #[cfg(feature = "R4B")]
1903        (SofViewDefinition::R4B(vd), SofBundle::R4B(bundle)) => {
1904            process_view_definition_generic(vd, bundle)
1905        }
1906        #[cfg(feature = "R5")]
1907        (SofViewDefinition::R5(vd), SofBundle::R5(bundle)) => {
1908            process_view_definition_generic(vd, bundle)
1909        }
1910        #[cfg(feature = "R6")]
1911        (SofViewDefinition::R6(vd), SofBundle::R6(bundle)) => {
1912            process_view_definition_generic(vd, bundle)
1913        }
1914        // This case should never happen due to the version check above,
1915        // but is needed for exhaustive pattern matching when multiple features are enabled
1916        #[cfg(any(
1917            all(feature = "R4", any(feature = "R4B", feature = "R5", feature = "R6")),
1918            all(feature = "R4B", any(feature = "R5", feature = "R6")),
1919            all(feature = "R5", feature = "R6")
1920        ))]
1921        _ => {
1922            unreachable!("Version mismatch should have been caught by the version check above")
1923        }
1924    }
1925}
1926
1927// Generic version-agnostic constant extraction
1928fn extract_view_definition_constants<VD: ViewDefinitionTrait>(
1929    view_definition: &VD,
1930) -> Result<HashMap<String, EvaluationResult>, SofError> {
1931    let mut variables = HashMap::new();
1932
1933    if let Some(constants) = view_definition.constants() {
1934        for constant in constants {
1935            let name = constant
1936                .name()
1937                .ok_or_else(|| {
1938                    SofError::InvalidViewDefinition("Constant name is required".to_string())
1939                })?
1940                .to_string();
1941
1942            let eval_result = constant.to_evaluation_result()?;
1943            // Constants are referenced with % prefix in FHIRPath expressions
1944            variables.insert(format!("%{}", name), eval_result);
1945        }
1946    }
1947
1948    Ok(variables)
1949}
1950
1951// Generic version-agnostic ViewDefinition processing
1952fn process_view_definition_generic<VD, B>(
1953    view_definition: VD,
1954    bundle: B,
1955) -> Result<ProcessedResult, SofError>
1956where
1957    VD: ViewDefinitionTrait,
1958    B: BundleTrait,
1959    B::Resource: ResourceTrait + Sync,
1960    VD::Select: Sync,
1961{
1962    validate_view_definition(&view_definition)?;
1963
1964    // Step 1: Extract constants/variables from ViewDefinition
1965    let variables = extract_view_definition_constants(&view_definition)?;
1966
1967    // Step 2: Filter resources by type and profile
1968    let target_resource_type = view_definition
1969        .resource()
1970        .ok_or_else(|| SofError::InvalidViewDefinition("Resource type is required".to_string()))?;
1971
1972    let filtered_resources = filter_resources(&bundle, target_resource_type)?;
1973
1974    // Step 3: Apply where clauses to filter resources
1975    let filtered_resources = apply_where_clauses(
1976        filtered_resources,
1977        view_definition.where_clauses(),
1978        &variables,
1979    )?;
1980
1981    // Step 4: Process all select clauses to generate rows with forEach support
1982    let select_clauses = view_definition.select().ok_or_else(|| {
1983        SofError::InvalidViewDefinition("At least one select clause is required".to_string())
1984    })?;
1985
1986    // Generate rows for each resource using the forEach-aware approach
1987    let (all_columns, rows) =
1988        generate_rows_from_selects(&filtered_resources, select_clauses, &variables)?;
1989
1990    Ok(ProcessedResult {
1991        columns: all_columns,
1992        rows,
1993    })
1994}
1995
1996// Generic version-agnostic validation
1997fn validate_view_definition<VD: ViewDefinitionTrait>(view_def: &VD) -> Result<(), SofError> {
1998    // Basic validation
1999    if view_def.resource().is_none_or(|s| s.is_empty()) {
2000        return Err(SofError::InvalidViewDefinition(
2001            "ViewDefinition must specify a resource type".to_string(),
2002        ));
2003    }
2004
2005    if view_def.select().is_none_or(|s| s.is_empty()) {
2006        return Err(SofError::InvalidViewDefinition(
2007            "ViewDefinition must have at least one select".to_string(),
2008        ));
2009    }
2010
2011    // Validate where clauses
2012    if let Some(where_clauses) = view_def.where_clauses() {
2013        validate_where_clauses(where_clauses)?;
2014    }
2015
2016    // Validate selects
2017    if let Some(selects) = view_def.select() {
2018        for select in selects {
2019            validate_select(select)?;
2020        }
2021    }
2022
2023    Ok(())
2024}
2025
2026// Generic where clause validation
2027fn validate_where_clauses<W: ViewDefinitionWhereTrait>(
2028    where_clauses: &[W],
2029) -> Result<(), SofError> {
2030    // Basic validation - just ensure paths are provided
2031    // Type checking will be done during actual evaluation
2032    for where_clause in where_clauses {
2033        if where_clause.path().is_none() {
2034            return Err(SofError::InvalidViewDefinition(
2035                "Where clause must have a path specified".to_string(),
2036            ));
2037        }
2038    }
2039    Ok(())
2040}
2041
2042// Generic helper - no longer needs to be version-specific
2043fn can_be_coerced_to_boolean(result: &EvaluationResult) -> bool {
2044    // Check if the result can be meaningfully used as a boolean in a where clause
2045    match result {
2046        // Boolean values are obviously OK
2047        EvaluationResult::Boolean(_, _) => true,
2048
2049        // Empty is OK (evaluates to false)
2050        EvaluationResult::Empty => true,
2051
2052        // Collections are OK - they evaluate based on whether they're empty or not
2053        EvaluationResult::Collection { .. } => true,
2054
2055        // Other types cannot be meaningfully coerced to boolean for where clauses
2056        // This includes: String, Integer, Decimal, Date, DateTime, Time, Quantity, Object
2057        _ => false,
2058    }
2059}
2060
2061// Generic select validation
2062fn validate_select<S: ViewDefinitionSelectTrait>(select: &S) -> Result<(), SofError> {
2063    validate_select_with_context(select, false)
2064}
2065
2066fn validate_select_with_context<S: ViewDefinitionSelectTrait>(
2067    select: &S,
2068    in_foreach_context: bool,
2069) -> Result<(), SofError>
2070where
2071    S::Select: ViewDefinitionSelectTrait,
2072{
2073    // Determine if we're entering a forEach context at this level
2074    let entering_foreach = select.for_each().is_some() || select.for_each_or_null().is_some();
2075    let current_foreach_context = in_foreach_context || entering_foreach;
2076
2077    // Validate collection attribute with the current forEach context
2078    if let Some(columns) = select.column() {
2079        for column in columns {
2080            if let Some(collection_value) = column.collection() {
2081                if !collection_value && !current_foreach_context {
2082                    return Err(SofError::InvalidViewDefinition(
2083                        "Column 'collection' attribute must be true when specified".to_string(),
2084                    ));
2085                }
2086            }
2087        }
2088    }
2089
2090    // Validate unionAll column consistency
2091    if let Some(union_selects) = select.union_all() {
2092        validate_union_all_columns(union_selects)?;
2093    }
2094
2095    // Recursively validate nested selects
2096    if let Some(nested_selects) = select.select() {
2097        for nested_select in nested_selects {
2098            validate_select_with_context(nested_select, current_foreach_context)?;
2099        }
2100    }
2101
2102    // Validate unionAll selects with forEach context
2103    if let Some(union_selects) = select.union_all() {
2104        for union_select in union_selects {
2105            validate_select_with_context(union_select, current_foreach_context)?;
2106        }
2107    }
2108
2109    Ok(())
2110}
2111
2112// Generic union validation
2113fn validate_union_all_columns<S: ViewDefinitionSelectTrait>(
2114    union_selects: &[S],
2115) -> Result<(), SofError> {
2116    if union_selects.len() < 2 {
2117        return Ok(());
2118    }
2119
2120    // Get column names and order from first select
2121    let first_select = &union_selects[0];
2122    let first_columns = get_column_names(first_select)?;
2123
2124    // Validate all other selects have the same column names in the same order
2125    for (index, union_select) in union_selects.iter().enumerate().skip(1) {
2126        let current_columns = get_column_names(union_select)?;
2127
2128        if current_columns != first_columns {
2129            if current_columns.len() != first_columns.len()
2130                || !current_columns
2131                    .iter()
2132                    .all(|name| first_columns.contains(name))
2133            {
2134                return Err(SofError::InvalidViewDefinition(format!(
2135                    "UnionAll branch {} has different column names than first branch",
2136                    index
2137                )));
2138            } else {
2139                return Err(SofError::InvalidViewDefinition(format!(
2140                    "UnionAll branch {} has columns in different order than first branch",
2141                    index
2142                )));
2143            }
2144        }
2145    }
2146
2147    Ok(())
2148}
2149
2150// Generic column name extraction
2151fn get_column_names<S: ViewDefinitionSelectTrait>(select: &S) -> Result<Vec<String>, SofError> {
2152    let mut column_names = Vec::new();
2153
2154    // Collect direct column names
2155    if let Some(columns) = select.column() {
2156        for column in columns {
2157            if let Some(name) = column.name() {
2158                column_names.push(name.to_string());
2159            }
2160        }
2161    }
2162
2163    // If this select has unionAll but no direct columns, get columns from first unionAll branch
2164    if column_names.is_empty() {
2165        if let Some(union_selects) = select.union_all() {
2166            if !union_selects.is_empty() {
2167                return get_column_names(&union_selects[0]);
2168            }
2169        }
2170    }
2171
2172    Ok(column_names)
2173}
2174
2175// Generic resource filtering
2176fn filter_resources<'a, B: BundleTrait>(
2177    bundle: &'a B,
2178    resource_type: &str,
2179) -> Result<Vec<&'a B::Resource>, SofError> {
2180    Ok(bundle
2181        .entries()
2182        .into_iter()
2183        .filter(|resource| resource.resource_name() == resource_type)
2184        .collect())
2185}
2186
2187// Generic where clause application
2188fn apply_where_clauses<'a, R, W>(
2189    resources: Vec<&'a R>,
2190    where_clauses: Option<&[W]>,
2191    variables: &HashMap<String, EvaluationResult>,
2192) -> Result<Vec<&'a R>, SofError>
2193where
2194    R: ResourceTrait,
2195    W: ViewDefinitionWhereTrait,
2196{
2197    if let Some(wheres) = where_clauses {
2198        let mut filtered = Vec::new();
2199
2200        for resource in resources {
2201            let mut include_resource = true;
2202
2203            // All where clauses must evaluate to true for the resource to be included
2204            for where_clause in wheres {
2205                let fhir_resource = resource.to_fhir_resource();
2206                let mut context = EvaluationContext::new(vec![fhir_resource]);
2207
2208                // Add variables to the context
2209                for (name, value) in variables {
2210                    context.set_variable_result(name, value.clone());
2211                }
2212
2213                let path = where_clause.path().ok_or_else(|| {
2214                    SofError::InvalidViewDefinition("Where clause path is required".to_string())
2215                })?;
2216
2217                match evaluate_expression(path, &context) {
2218                    Ok(result) => {
2219                        // Check if the result can be meaningfully used as a boolean
2220                        if !can_be_coerced_to_boolean(&result) {
2221                            return Err(SofError::InvalidViewDefinition(format!(
2222                                "Where clause path '{}' returns type '{}' which cannot be used as a boolean condition. \
2223                                 Where clauses must return boolean values, collections, or empty results.",
2224                                path,
2225                                result.type_name()
2226                            )));
2227                        }
2228
2229                        // Check if result is truthy (non-empty and not false)
2230                        if !is_truthy(&result) {
2231                            include_resource = false;
2232                            break;
2233                        }
2234                    }
2235                    Err(e) => {
2236                        return Err(SofError::FhirPathError(format!(
2237                            "Error evaluating where clause '{}': {}",
2238                            path, e
2239                        )));
2240                    }
2241                }
2242            }
2243
2244            if include_resource {
2245                filtered.push(resource);
2246            }
2247        }
2248
2249        Ok(filtered)
2250    } else {
2251        Ok(resources)
2252    }
2253}
2254
2255// Removed generate_rows_per_resource_r4 - replaced with new forEach-aware implementation
2256
2257// Removed generate_rows_with_for_each_r4 - replaced with new forEach-aware implementation
2258
2259// Helper functions for FHIRPath result processing
2260fn is_truthy(result: &EvaluationResult) -> bool {
2261    match result {
2262        EvaluationResult::Empty => false,
2263        EvaluationResult::Boolean(b, _) => *b,
2264        EvaluationResult::Collection { items, .. } => !items.is_empty(),
2265        _ => true, // Non-empty, non-false values are truthy
2266    }
2267}
2268
2269fn fhirpath_result_to_json_value_collection(result: EvaluationResult) -> Option<serde_json::Value> {
2270    match result {
2271        EvaluationResult::Empty => Some(serde_json::Value::Array(vec![])),
2272        EvaluationResult::Collection { items, .. } => {
2273            // Always return array for collection columns, even if empty
2274            let values: Vec<serde_json::Value> = items
2275                .into_iter()
2276                .filter_map(fhirpath_result_to_json_value)
2277                .collect();
2278            Some(serde_json::Value::Array(values))
2279        }
2280        // For non-collection results in collection columns, wrap in array
2281        single_result => {
2282            if let Some(json_val) = fhirpath_result_to_json_value(single_result) {
2283                Some(serde_json::Value::Array(vec![json_val]))
2284            } else {
2285                Some(serde_json::Value::Array(vec![]))
2286            }
2287        }
2288    }
2289}
2290
2291fn fhirpath_result_to_json_value(result: EvaluationResult) -> Option<serde_json::Value> {
2292    match result {
2293        EvaluationResult::Empty => None,
2294        EvaluationResult::Boolean(b, _) => Some(serde_json::Value::Bool(b)),
2295        EvaluationResult::Integer(i, _) => {
2296            Some(serde_json::Value::Number(serde_json::Number::from(i)))
2297        }
2298        EvaluationResult::Decimal(d, _) => {
2299            // Check if this Decimal represents a whole number
2300            if d.fract().is_zero() {
2301                // Convert to integer if no fractional part
2302                if let Ok(i) = d.to_string().parse::<i64>() {
2303                    Some(serde_json::Value::Number(serde_json::Number::from(i)))
2304                } else {
2305                    // Handle very large numbers as strings
2306                    Some(serde_json::Value::String(d.to_string()))
2307                }
2308            } else {
2309                // Convert Decimal to a float for fractional numbers
2310                if let Ok(f) = d.to_string().parse::<f64>() {
2311                    if let Some(num) = serde_json::Number::from_f64(f) {
2312                        Some(serde_json::Value::Number(num))
2313                    } else {
2314                        Some(serde_json::Value::String(d.to_string()))
2315                    }
2316                } else {
2317                    Some(serde_json::Value::String(d.to_string()))
2318                }
2319            }
2320        }
2321        EvaluationResult::String(s, _) => Some(serde_json::Value::String(s)),
2322        EvaluationResult::Date(s, _) => Some(serde_json::Value::String(s)),
2323        EvaluationResult::DateTime(s, _) => {
2324            // Remove "@" prefix from datetime strings if present
2325            let cleaned = s.strip_prefix("@").unwrap_or(&s);
2326            Some(serde_json::Value::String(cleaned.to_string()))
2327        }
2328        EvaluationResult::Time(s, _) => {
2329            // Remove "@T" prefix from time strings if present
2330            let cleaned = s.strip_prefix("@T").unwrap_or(&s);
2331            Some(serde_json::Value::String(cleaned.to_string()))
2332        }
2333        EvaluationResult::Collection { items, .. } => {
2334            if items.len() == 1 {
2335                // Single item collection - unwrap to the item itself
2336                fhirpath_result_to_json_value(items.into_iter().next().unwrap())
2337            } else if items.is_empty() {
2338                None
2339            } else {
2340                // Multiple items - convert to array
2341                let values: Vec<serde_json::Value> = items
2342                    .into_iter()
2343                    .filter_map(fhirpath_result_to_json_value)
2344                    .collect();
2345                Some(serde_json::Value::Array(values))
2346            }
2347        }
2348        EvaluationResult::Object { map, .. } => {
2349            let mut json_map = serde_json::Map::new();
2350            for (k, v) in map {
2351                if let Some(json_val) = fhirpath_result_to_json_value(v) {
2352                    json_map.insert(k, json_val);
2353                }
2354            }
2355            Some(serde_json::Value::Object(json_map))
2356        }
2357        // Handle other result types as strings
2358        _ => Some(serde_json::Value::String(format!("{:?}", result))),
2359    }
2360}
2361
2362fn extract_iteration_items(result: EvaluationResult) -> Vec<EvaluationResult> {
2363    match result {
2364        EvaluationResult::Collection { items, .. } => items,
2365        EvaluationResult::Empty => Vec::new(),
2366        single_item => vec![single_item],
2367    }
2368}
2369
2370// Generic row generation functions
2371
2372fn generate_rows_from_selects<R, S>(
2373    resources: &[&R],
2374    selects: &[S],
2375    variables: &HashMap<String, EvaluationResult>,
2376) -> Result<(Vec<String>, Vec<ProcessedRow>), SofError>
2377where
2378    R: ResourceTrait + Sync,
2379    S: ViewDefinitionSelectTrait + Sync,
2380    S::Select: ViewDefinitionSelectTrait,
2381{
2382    // Process resources in parallel
2383    let resource_results: Result<Vec<_>, _> = resources
2384        .par_iter()
2385        .map(|resource| {
2386            // Each thread gets its own local column vector
2387            let mut local_columns = Vec::new();
2388            let resource_rows =
2389                generate_rows_for_resource(*resource, selects, &mut local_columns, variables)?;
2390            Ok::<(Vec<String>, Vec<ProcessedRow>), SofError>((local_columns, resource_rows))
2391        })
2392        .collect();
2393
2394    // Handle errors from parallel processing
2395    let resource_results = resource_results?;
2396
2397    // Merge columns from all threads (maintaining order is important)
2398    let mut final_columns = Vec::new();
2399    let mut all_rows = Vec::new();
2400
2401    for (local_columns, resource_rows) in resource_results {
2402        // Merge columns, avoiding duplicates
2403        for col in local_columns {
2404            if !final_columns.contains(&col) {
2405                final_columns.push(col);
2406            }
2407        }
2408        all_rows.extend(resource_rows);
2409    }
2410
2411    Ok((final_columns, all_rows))
2412}
2413
2414fn generate_rows_for_resource<R, S>(
2415    resource: &R,
2416    selects: &[S],
2417    all_columns: &mut Vec<String>,
2418    variables: &HashMap<String, EvaluationResult>,
2419) -> Result<Vec<ProcessedRow>, SofError>
2420where
2421    R: ResourceTrait,
2422    S: ViewDefinitionSelectTrait,
2423    S::Select: ViewDefinitionSelectTrait,
2424{
2425    let fhir_resource = resource.to_fhir_resource();
2426    let mut context = EvaluationContext::new(vec![fhir_resource]);
2427
2428    // Add variables to the context
2429    for (name, value) in variables {
2430        context.set_variable_result(name, value.clone());
2431    }
2432
2433    // Generate all possible row combinations for this resource
2434    let row_combinations = generate_row_combinations(&context, selects, all_columns, variables)?;
2435
2436    Ok(row_combinations)
2437}
2438
2439#[derive(Debug, Clone)]
2440struct RowCombination {
2441    values: Vec<Option<serde_json::Value>>,
2442}
2443
2444fn generate_row_combinations<S>(
2445    context: &EvaluationContext,
2446    selects: &[S],
2447    all_columns: &mut Vec<String>,
2448    variables: &HashMap<String, EvaluationResult>,
2449) -> Result<Vec<ProcessedRow>, SofError>
2450where
2451    S: ViewDefinitionSelectTrait,
2452    S::Select: ViewDefinitionSelectTrait,
2453{
2454    // First pass: collect all column names to ensure consistent ordering
2455    collect_all_columns(selects, all_columns)?;
2456
2457    // Second pass: generate all row combinations
2458    let mut row_combinations = vec![RowCombination {
2459        values: vec![None; all_columns.len()],
2460    }];
2461
2462    for select in selects {
2463        row_combinations =
2464            expand_select_combinations(context, select, &row_combinations, all_columns, variables)?;
2465    }
2466
2467    // Convert to ProcessedRow format
2468    Ok(row_combinations
2469        .into_iter()
2470        .map(|combo| ProcessedRow {
2471            values: combo.values,
2472        })
2473        .collect())
2474}
2475
2476fn collect_all_columns<S>(selects: &[S], all_columns: &mut Vec<String>) -> Result<(), SofError>
2477where
2478    S: ViewDefinitionSelectTrait,
2479{
2480    for select in selects {
2481        // Add columns from this select
2482        if let Some(columns) = select.column() {
2483            for col in columns {
2484                if let Some(name) = col.name() {
2485                    if !all_columns.contains(&name.to_string()) {
2486                        all_columns.push(name.to_string());
2487                    }
2488                }
2489            }
2490        }
2491
2492        // Recursively collect from nested selects
2493        if let Some(nested_selects) = select.select() {
2494            collect_all_columns(nested_selects, all_columns)?;
2495        }
2496
2497        // Collect from unionAll
2498        if let Some(union_selects) = select.union_all() {
2499            collect_all_columns(union_selects, all_columns)?;
2500        }
2501    }
2502    Ok(())
2503}
2504
2505fn expand_select_combinations<S>(
2506    context: &EvaluationContext,
2507    select: &S,
2508    existing_combinations: &[RowCombination],
2509    all_columns: &[String],
2510    variables: &HashMap<String, EvaluationResult>,
2511) -> Result<Vec<RowCombination>, SofError>
2512where
2513    S: ViewDefinitionSelectTrait,
2514    S::Select: ViewDefinitionSelectTrait,
2515{
2516    // Handle forEach and forEachOrNull
2517    if let Some(for_each_path) = select.for_each() {
2518        return expand_for_each_combinations(
2519            context,
2520            select,
2521            existing_combinations,
2522            all_columns,
2523            for_each_path,
2524            false,
2525            variables,
2526        );
2527    }
2528
2529    if let Some(for_each_or_null_path) = select.for_each_or_null() {
2530        return expand_for_each_combinations(
2531            context,
2532            select,
2533            existing_combinations,
2534            all_columns,
2535            for_each_or_null_path,
2536            true,
2537            variables,
2538        );
2539    }
2540
2541    // Handle repeat directive for recursive traversal
2542    if let Some(repeat_paths) = select.repeat() {
2543        return expand_repeat_combinations(
2544            context,
2545            select,
2546            existing_combinations,
2547            all_columns,
2548            &repeat_paths,
2549            variables,
2550        );
2551    }
2552
2553    // Handle regular columns (no forEach)
2554    let mut new_combinations = Vec::new();
2555
2556    for existing_combo in existing_combinations {
2557        let mut new_combo = existing_combo.clone();
2558
2559        // Add values from this select's columns
2560        if let Some(columns) = select.column() {
2561            for col in columns {
2562                if let Some(col_name) = col.name() {
2563                    if let Some(col_index) = all_columns.iter().position(|name| name == col_name) {
2564                        let path = col.path().ok_or_else(|| {
2565                            SofError::InvalidViewDefinition("Column path is required".to_string())
2566                        })?;
2567
2568                        match evaluate_expression(path, context) {
2569                            Ok(result) => {
2570                                // Check if this column is marked as a collection
2571                                let is_collection = col.collection().unwrap_or(false);
2572
2573                                new_combo.values[col_index] = if is_collection {
2574                                    fhirpath_result_to_json_value_collection(result)
2575                                } else {
2576                                    fhirpath_result_to_json_value(result)
2577                                };
2578                            }
2579                            Err(e) => {
2580                                return Err(SofError::FhirPathError(format!(
2581                                    "Error evaluating column '{}' with path '{}': {}",
2582                                    col_name, path, e
2583                                )));
2584                            }
2585                        }
2586                    }
2587                }
2588            }
2589        }
2590
2591        new_combinations.push(new_combo);
2592    }
2593
2594    // Handle nested selects
2595    if let Some(nested_selects) = select.select() {
2596        for nested_select in nested_selects {
2597            new_combinations = expand_select_combinations(
2598                context,
2599                nested_select,
2600                &new_combinations,
2601                all_columns,
2602                variables,
2603            )?;
2604        }
2605    }
2606
2607    // Handle unionAll
2608    if let Some(union_selects) = select.union_all() {
2609        let mut union_combinations = Vec::new();
2610
2611        // Process each unionAll select independently, using the combinations that already have
2612        // values from this select's columns and nested selects
2613        for union_select in union_selects {
2614            let select_combinations = expand_select_combinations(
2615                context,
2616                union_select,
2617                &new_combinations,
2618                all_columns,
2619                variables,
2620            )?;
2621            union_combinations.extend(select_combinations);
2622        }
2623
2624        // unionAll replaces new_combinations with the union results
2625        // If no union results, this resource should be filtered out (no rows for this resource)
2626        new_combinations = union_combinations;
2627    }
2628
2629    Ok(new_combinations)
2630}
2631
2632fn expand_for_each_combinations<S>(
2633    context: &EvaluationContext,
2634    select: &S,
2635    existing_combinations: &[RowCombination],
2636    all_columns: &[String],
2637    for_each_path: &str,
2638    allow_null: bool,
2639    variables: &HashMap<String, EvaluationResult>,
2640) -> Result<Vec<RowCombination>, SofError>
2641where
2642    S: ViewDefinitionSelectTrait,
2643    S::Select: ViewDefinitionSelectTrait,
2644{
2645    // Evaluate the forEach expression to get iteration items
2646    let for_each_result = evaluate_expression(for_each_path, context).map_err(|e| {
2647        SofError::FhirPathError(format!(
2648            "Error evaluating forEach expression '{}': {}",
2649            for_each_path, e
2650        ))
2651    })?;
2652
2653    let iteration_items = extract_iteration_items(for_each_result);
2654
2655    if iteration_items.is_empty() {
2656        if allow_null {
2657            // forEachOrNull: generate null rows
2658            let mut new_combinations = Vec::new();
2659            for existing_combo in existing_combinations {
2660                let mut new_combo = existing_combo.clone();
2661
2662                // Set column values to null for this forEach scope
2663                if let Some(columns) = select.column() {
2664                    for col in columns {
2665                        if let Some(col_name) = col.name() {
2666                            if let Some(col_index) =
2667                                all_columns.iter().position(|name| name == col_name)
2668                            {
2669                                new_combo.values[col_index] = None;
2670                            }
2671                        }
2672                    }
2673                }
2674
2675                new_combinations.push(new_combo);
2676            }
2677            return Ok(new_combinations);
2678        } else {
2679            // forEach with empty collection: no rows
2680            return Ok(Vec::new());
2681        }
2682    }
2683
2684    let mut new_combinations = Vec::new();
2685
2686    // For each iteration item, create new combinations
2687    for item in &iteration_items {
2688        // Create a new context with the iteration item
2689        let _item_context = create_iteration_context(item, variables);
2690
2691        for existing_combo in existing_combinations {
2692            let mut new_combo = existing_combo.clone();
2693
2694            // Evaluate columns in the context of the iteration item
2695            if let Some(columns) = select.column() {
2696                for col in columns {
2697                    if let Some(col_name) = col.name() {
2698                        if let Some(col_index) =
2699                            all_columns.iter().position(|name| name == col_name)
2700                        {
2701                            let path = col.path().ok_or_else(|| {
2702                                SofError::InvalidViewDefinition(
2703                                    "Column path is required".to_string(),
2704                                )
2705                            })?;
2706
2707                            // Use the iteration item directly for path evaluation
2708                            let result = if path == "$this" {
2709                                // Special case: $this refers to the current iteration item
2710                                item.clone()
2711                            } else {
2712                                // Evaluate the path on the iteration item
2713                                evaluate_path_on_item(path, item, variables)?
2714                            };
2715
2716                            // Check if this column is marked as a collection
2717                            let is_collection = col.collection().unwrap_or(false);
2718
2719                            new_combo.values[col_index] = if is_collection {
2720                                fhirpath_result_to_json_value_collection(result)
2721                            } else {
2722                                fhirpath_result_to_json_value(result)
2723                            };
2724                        }
2725                    }
2726                }
2727            }
2728
2729            new_combinations.push(new_combo);
2730        }
2731    }
2732
2733    // Handle nested selects with the forEach context
2734    if let Some(nested_selects) = select.select() {
2735        let mut final_combinations = Vec::new();
2736
2737        for item in &iteration_items {
2738            let item_context = create_iteration_context(item, variables);
2739
2740            // For each iteration item, we need to start with the combinations that have
2741            // the correct column values for this forEach scope
2742            for existing_combo in existing_combinations {
2743                // Find the combination that corresponds to this iteration item
2744                // by looking at the values we set for columns in this forEach scope
2745                let mut base_combo = existing_combo.clone();
2746
2747                // Update the base combination with column values for this iteration item
2748                if let Some(columns) = select.column() {
2749                    for col in columns {
2750                        if let Some(col_name) = col.name() {
2751                            if let Some(col_index) =
2752                                all_columns.iter().position(|name| name == col_name)
2753                            {
2754                                let path = col.path().ok_or_else(|| {
2755                                    SofError::InvalidViewDefinition(
2756                                        "Column path is required".to_string(),
2757                                    )
2758                                })?;
2759
2760                                let result = if path == "$this" {
2761                                    item.clone()
2762                                } else {
2763                                    evaluate_path_on_item(path, item, variables)?
2764                                };
2765
2766                                // Check if this column is marked as a collection
2767                                let is_collection = col.collection().unwrap_or(false);
2768
2769                                base_combo.values[col_index] = if is_collection {
2770                                    fhirpath_result_to_json_value_collection(result)
2771                                } else {
2772                                    fhirpath_result_to_json_value(result)
2773                                };
2774                            }
2775                        }
2776                    }
2777                }
2778
2779                // Start with this base combination for nested processing
2780                let mut item_combinations = vec![base_combo];
2781
2782                // Process nested selects
2783                for nested_select in nested_selects {
2784                    item_combinations = expand_select_combinations(
2785                        &item_context,
2786                        nested_select,
2787                        &item_combinations,
2788                        all_columns,
2789                        variables,
2790                    )?;
2791                }
2792
2793                final_combinations.extend(item_combinations);
2794            }
2795        }
2796
2797        new_combinations = final_combinations;
2798    }
2799
2800    // Handle unionAll within forEach context
2801    if let Some(union_selects) = select.union_all() {
2802        let mut union_combinations = Vec::new();
2803
2804        for item in &iteration_items {
2805            let item_context = create_iteration_context(item, variables);
2806
2807            // For each iteration item, process all unionAll selects
2808            for existing_combo in existing_combinations {
2809                let mut base_combo = existing_combo.clone();
2810
2811                // Update the base combination with column values for this iteration item
2812                if let Some(columns) = select.column() {
2813                    for col in columns {
2814                        if let Some(col_name) = col.name() {
2815                            if let Some(col_index) =
2816                                all_columns.iter().position(|name| name == col_name)
2817                            {
2818                                let path = col.path().ok_or_else(|| {
2819                                    SofError::InvalidViewDefinition(
2820                                        "Column path is required".to_string(),
2821                                    )
2822                                })?;
2823
2824                                let result = if path == "$this" {
2825                                    item.clone()
2826                                } else {
2827                                    evaluate_path_on_item(path, item, variables)?
2828                                };
2829
2830                                // Check if this column is marked as a collection
2831                                let is_collection = col.collection().unwrap_or(false);
2832
2833                                base_combo.values[col_index] = if is_collection {
2834                                    fhirpath_result_to_json_value_collection(result)
2835                                } else {
2836                                    fhirpath_result_to_json_value(result)
2837                                };
2838                            }
2839                        }
2840                    }
2841                }
2842
2843                // Also evaluate columns from nested selects and add them to base_combo
2844                if let Some(nested_selects) = select.select() {
2845                    for nested_select in nested_selects {
2846                        if let Some(nested_columns) = nested_select.column() {
2847                            for col in nested_columns {
2848                                if let Some(col_name) = col.name() {
2849                                    if let Some(col_index) =
2850                                        all_columns.iter().position(|name| name == col_name)
2851                                    {
2852                                        let path = col.path().ok_or_else(|| {
2853                                            SofError::InvalidViewDefinition(
2854                                                "Column path is required".to_string(),
2855                                            )
2856                                        })?;
2857
2858                                        let result = if path == "$this" {
2859                                            item.clone()
2860                                        } else {
2861                                            evaluate_path_on_item(path, item, variables)?
2862                                        };
2863
2864                                        // Check if this column is marked as a collection
2865                                        let is_collection = col.collection().unwrap_or(false);
2866
2867                                        base_combo.values[col_index] = if is_collection {
2868                                            fhirpath_result_to_json_value_collection(result)
2869                                        } else {
2870                                            fhirpath_result_to_json_value(result)
2871                                        };
2872                                    }
2873                                }
2874                            }
2875                        }
2876                    }
2877                }
2878
2879                // Process each unionAll select independently for this iteration item
2880                for union_select in union_selects {
2881                    let mut select_combinations = vec![base_combo.clone()];
2882                    select_combinations = expand_select_combinations(
2883                        &item_context,
2884                        union_select,
2885                        &select_combinations,
2886                        all_columns,
2887                        variables,
2888                    )?;
2889                    union_combinations.extend(select_combinations);
2890                }
2891            }
2892        }
2893
2894        // unionAll replaces new_combinations with the union results
2895        // If no union results, filter out this resource (no rows for this resource)
2896        new_combinations = union_combinations;
2897    }
2898
2899    Ok(new_combinations)
2900}
2901
2902fn expand_repeat_combinations<S>(
2903    context: &EvaluationContext,
2904    select: &S,
2905    existing_combinations: &[RowCombination],
2906    all_columns: &[String],
2907    repeat_paths: &[&str],
2908    variables: &HashMap<String, EvaluationResult>,
2909) -> Result<Vec<RowCombination>, SofError>
2910where
2911    S: ViewDefinitionSelectTrait,
2912    S::Select: ViewDefinitionSelectTrait,
2913{
2914    // The repeat directive performs recursive traversal:
2915    // 1. For each repeat path, find child elements from the current context
2916    // 2. For each child element:
2917    //    a. Evaluate columns in the child's context
2918    //    b. Recursively process the child with the same repeat paths
2919    // 3. Union all results together
2920    //
2921    // Note: Unlike forEach, repeat does NOT process the current level's columns
2922    // - it ONLY processes elements found via the repeat paths
2923
2924    let mut all_combinations = Vec::new();
2925
2926    // Process each existing combination
2927    for existing_combo in existing_combinations {
2928        // Process each repeat path to find children to traverse
2929        for repeat_path in repeat_paths {
2930            // Evaluate the repeat path to get child elements
2931            let repeat_result = evaluate_expression(repeat_path, context).map_err(|e| {
2932                SofError::FhirPathError(format!(
2933                    "Error evaluating repeat expression '{}': {}",
2934                    repeat_path, e
2935                ))
2936            })?;
2937
2938            let child_items = extract_iteration_items(repeat_result);
2939
2940            // For each child item found via this repeat path
2941            for child_item in &child_items {
2942                // Create a combination for this child with current level's columns
2943                let mut child_combo = existing_combo.clone();
2944
2945                // Evaluate columns in the context of this child item
2946                if let Some(columns) = select.column() {
2947                    for col in columns {
2948                        if let Some(col_name) = col.name() {
2949                            if let Some(col_index) =
2950                                all_columns.iter().position(|name| name == col_name)
2951                            {
2952                                let path = col.path().ok_or_else(|| {
2953                                    SofError::InvalidViewDefinition(
2954                                        "Column path is required".to_string(),
2955                                    )
2956                                })?;
2957
2958                                // Evaluate the path on the child item
2959                                let result = if path == "$this" {
2960                                    child_item.clone()
2961                                } else {
2962                                    evaluate_path_on_item(path, child_item, variables)?
2963                                };
2964
2965                                let is_collection = col.collection().unwrap_or(false);
2966                                child_combo.values[col_index] = if is_collection {
2967                                    fhirpath_result_to_json_value_collection(result)
2968                                } else {
2969                                    fhirpath_result_to_json_value(result)
2970                                };
2971                            }
2972                        }
2973                    }
2974                }
2975
2976                // Create context for this child item
2977                let child_context = create_iteration_context(child_item, variables);
2978
2979                // Start with the child combination we just created
2980                let mut child_combinations = vec![child_combo.clone()];
2981
2982                // Process nested selects (like forEach/forEachOrNull) in the child's context
2983                if let Some(nested_selects) = select.select() {
2984                    for nested_select in nested_selects {
2985                        child_combinations = expand_select_combinations(
2986                            &child_context,
2987                            nested_select,
2988                            &child_combinations,
2989                            all_columns,
2990                            variables,
2991                        )?;
2992                    }
2993                }
2994
2995                // Add the processed combinations to our results
2996                // (these may have been filtered by forEach, which is correct)
2997                all_combinations.extend(child_combinations);
2998
2999                // Now recursively process this child with the same repeat paths
3000                // IMPORTANT: Use the original child_combo, not the forEach-filtered results
3001                let recursive_combinations = expand_repeat_combinations(
3002                    &child_context,
3003                    select,
3004                    &[child_combo],
3005                    all_columns,
3006                    repeat_paths,
3007                    variables,
3008                )?;
3009
3010                all_combinations.extend(recursive_combinations);
3011            }
3012        }
3013    }
3014
3015    Ok(all_combinations)
3016}
3017
3018// Generic helper functions
3019fn evaluate_path_on_item(
3020    path: &str,
3021    item: &EvaluationResult,
3022    variables: &HashMap<String, EvaluationResult>,
3023) -> Result<EvaluationResult, SofError> {
3024    // Create a temporary context with the iteration item as the root resource
3025    let mut temp_context = match item {
3026        EvaluationResult::Object { .. } => {
3027            // Convert the iteration item to a resource-like structure for FHIRPath evaluation
3028            // For simplicity, we'll create a basic context where the item is available for evaluation
3029            let mut context = EvaluationContext::new(vec![]);
3030            context.this = Some(item.clone());
3031            context
3032        }
3033        _ => EvaluationContext::new(vec![]),
3034    };
3035
3036    // Add variables to the temporary context
3037    for (name, value) in variables {
3038        temp_context.set_variable_result(name, value.clone());
3039    }
3040
3041    // Evaluate the FHIRPath expression in the context of the iteration item
3042    match evaluate_expression(path, &temp_context) {
3043        Ok(result) => Ok(result),
3044        Err(_e) => {
3045            // If FHIRPath evaluation fails, try simple property access as fallback
3046            match item {
3047                EvaluationResult::Object { map, .. } => {
3048                    if let Some(value) = map.get(path) {
3049                        Ok(value.clone())
3050                    } else {
3051                        Ok(EvaluationResult::Empty)
3052                    }
3053                }
3054                _ => Ok(EvaluationResult::Empty),
3055            }
3056        }
3057    }
3058}
3059
3060fn create_iteration_context(
3061    item: &EvaluationResult,
3062    variables: &HashMap<String, EvaluationResult>,
3063) -> EvaluationContext {
3064    // Create a new context with the iteration item as the root
3065    let mut context = EvaluationContext::new(vec![]);
3066    context.this = Some(item.clone());
3067
3068    // Preserve variables from the parent context
3069    for (name, value) in variables {
3070        context.set_variable_result(name, value.clone());
3071    }
3072
3073    context
3074}
3075
3076/// Filter a bundle's resources by their lastUpdated metadata
3077fn filter_bundle_by_since(bundle: SofBundle, since: DateTime<Utc>) -> Result<SofBundle, SofError> {
3078    match bundle {
3079        #[cfg(feature = "R4")]
3080        SofBundle::R4(mut b) => {
3081            if let Some(entries) = b.entry.as_mut() {
3082                entries.retain(|entry| {
3083                    entry
3084                        .resource
3085                        .as_ref()
3086                        .and_then(|r| r.get_last_updated())
3087                        .map(|last_updated| last_updated > since)
3088                        .unwrap_or(false)
3089                });
3090            }
3091            Ok(SofBundle::R4(b))
3092        }
3093        #[cfg(feature = "R4B")]
3094        SofBundle::R4B(mut b) => {
3095            if let Some(entries) = b.entry.as_mut() {
3096                entries.retain(|entry| {
3097                    entry
3098                        .resource
3099                        .as_ref()
3100                        .and_then(|r| r.get_last_updated())
3101                        .map(|last_updated| last_updated > since)
3102                        .unwrap_or(false)
3103                });
3104            }
3105            Ok(SofBundle::R4B(b))
3106        }
3107        #[cfg(feature = "R5")]
3108        SofBundle::R5(mut b) => {
3109            if let Some(entries) = b.entry.as_mut() {
3110                entries.retain(|entry| {
3111                    entry
3112                        .resource
3113                        .as_ref()
3114                        .and_then(|r| r.get_last_updated())
3115                        .map(|last_updated| last_updated > since)
3116                        .unwrap_or(false)
3117                });
3118            }
3119            Ok(SofBundle::R5(b))
3120        }
3121        #[cfg(feature = "R6")]
3122        SofBundle::R6(mut b) => {
3123            if let Some(entries) = b.entry.as_mut() {
3124                entries.retain(|entry| {
3125                    entry
3126                        .resource
3127                        .as_ref()
3128                        .and_then(|r| r.get_last_updated())
3129                        .map(|last_updated| last_updated > since)
3130                        .unwrap_or(false)
3131                });
3132            }
3133            Ok(SofBundle::R6(b))
3134        }
3135    }
3136}
3137
3138/// Apply pagination to processed results
3139fn apply_pagination_to_result(
3140    mut result: ProcessedResult,
3141    limit: Option<usize>,
3142    page: Option<usize>,
3143) -> Result<ProcessedResult, SofError> {
3144    if let Some(limit) = limit {
3145        let page_num = page.unwrap_or(1);
3146        if page_num == 0 {
3147            return Err(SofError::InvalidViewDefinition(
3148                "Page number must be greater than 0".to_string(),
3149            ));
3150        }
3151
3152        let start_index = (page_num - 1) * limit;
3153        if start_index >= result.rows.len() {
3154            // Return empty result if page is beyond data
3155            result.rows.clear();
3156        } else {
3157            let end_index = std::cmp::min(start_index + limit, result.rows.len());
3158            result.rows = result.rows[start_index..end_index].to_vec();
3159        }
3160    }
3161
3162    Ok(result)
3163}
3164
3165fn format_output(
3166    result: ProcessedResult,
3167    content_type: ContentType,
3168    parquet_options: Option<&ParquetOptions>,
3169) -> Result<Vec<u8>, SofError> {
3170    match content_type {
3171        ContentType::Csv | ContentType::CsvWithHeader => {
3172            format_csv(result, content_type == ContentType::CsvWithHeader)
3173        }
3174        ContentType::Json => format_json(result),
3175        ContentType::NdJson => format_ndjson(result),
3176        ContentType::Parquet => format_parquet(result, parquet_options),
3177    }
3178}
3179
3180fn format_csv(result: ProcessedResult, include_header: bool) -> Result<Vec<u8>, SofError> {
3181    let mut wtr = csv::Writer::from_writer(vec![]);
3182
3183    if include_header {
3184        wtr.write_record(&result.columns)?;
3185    }
3186
3187    for row in result.rows {
3188        let record: Vec<String> = row
3189            .values
3190            .iter()
3191            .map(|v| match v {
3192                Some(val) => {
3193                    // For string values, extract the raw string instead of JSON serializing
3194                    if let serde_json::Value::String(s) = val {
3195                        s.clone()
3196                    } else {
3197                        // For non-string values, use JSON serialization
3198                        serde_json::to_string(val).unwrap_or_default()
3199                    }
3200                }
3201                None => String::new(),
3202            })
3203            .collect();
3204        wtr.write_record(&record)?;
3205    }
3206
3207    wtr.into_inner()
3208        .map_err(|e| SofError::CsvWriterError(e.to_string()))
3209}
3210
3211fn format_json(result: ProcessedResult) -> Result<Vec<u8>, SofError> {
3212    let mut output = Vec::new();
3213
3214    for row in result.rows {
3215        let mut row_obj = serde_json::Map::new();
3216        for (i, column) in result.columns.iter().enumerate() {
3217            let value = row
3218                .values
3219                .get(i)
3220                .and_then(|v| v.as_ref())
3221                .cloned()
3222                .unwrap_or(serde_json::Value::Null);
3223            row_obj.insert(column.clone(), value);
3224        }
3225        output.push(serde_json::Value::Object(row_obj));
3226    }
3227
3228    Ok(serde_json::to_vec_pretty(&output)?)
3229}
3230
3231fn format_ndjson(result: ProcessedResult) -> Result<Vec<u8>, SofError> {
3232    let mut output = Vec::new();
3233
3234    for row in result.rows {
3235        let mut row_obj = serde_json::Map::new();
3236        for (i, column) in result.columns.iter().enumerate() {
3237            let value = row
3238                .values
3239                .get(i)
3240                .and_then(|v| v.as_ref())
3241                .cloned()
3242                .unwrap_or(serde_json::Value::Null);
3243            row_obj.insert(column.clone(), value);
3244        }
3245        let line = serde_json::to_string(&serde_json::Value::Object(row_obj))?;
3246        output.extend_from_slice(line.as_bytes());
3247        output.push(b'\n');
3248    }
3249
3250    Ok(output)
3251}
3252
3253fn format_parquet(
3254    result: ProcessedResult,
3255    options: Option<&ParquetOptions>,
3256) -> Result<Vec<u8>, SofError> {
3257    use arrow::record_batch::RecordBatch;
3258    use parquet::arrow::ArrowWriter;
3259    use parquet::basic::Compression;
3260    use parquet::file::properties::WriterProperties;
3261    use std::io::Cursor;
3262
3263    // Create Arrow schema from columns and sample data
3264    let schema = parquet_schema::create_arrow_schema(&result.columns, &result.rows)?;
3265    let schema_ref = std::sync::Arc::new(schema.clone());
3266
3267    // Get configuration from options or use defaults
3268    let parquet_opts = options.cloned().unwrap_or_default();
3269
3270    // Calculate optimal batch size based on row count and estimated row size
3271    let target_row_group_size_bytes = (parquet_opts.row_group_size_mb as usize) * 1024 * 1024;
3272    let target_page_size_bytes = (parquet_opts.page_size_kb as usize) * 1024;
3273    const TARGET_ROWS_PER_BATCH: usize = 100_000; // Default batch size
3274    const MAX_ROWS_PER_BATCH: usize = 500_000; // Maximum to prevent memory issues
3275
3276    // Estimate average row size from first 100 rows
3277    let sample_size = std::cmp::min(100, result.rows.len());
3278    let mut estimated_row_size = 100; // Default estimate in bytes
3279
3280    if sample_size > 0 {
3281        let sample_json_size: usize = result.rows[..sample_size]
3282            .iter()
3283            .map(|row| {
3284                row.values
3285                    .iter()
3286                    .filter_map(|v| v.as_ref())
3287                    .map(|v| v.to_string().len())
3288                    .sum::<usize>()
3289            })
3290            .sum();
3291        estimated_row_size = (sample_json_size / sample_size).max(50);
3292    }
3293
3294    // Calculate optimal batch size
3295    let optimal_batch_size = (target_row_group_size_bytes / estimated_row_size)
3296        .clamp(TARGET_ROWS_PER_BATCH, MAX_ROWS_PER_BATCH);
3297
3298    // Parse compression algorithm
3299    use parquet::basic::BrotliLevel;
3300    use parquet::basic::GzipLevel;
3301    use parquet::basic::ZstdLevel;
3302
3303    let compression = match parquet_opts.compression.as_str() {
3304        "none" => Compression::UNCOMPRESSED,
3305        "gzip" => Compression::GZIP(GzipLevel::default()),
3306        "lz4" => Compression::LZ4,
3307        "brotli" => Compression::BROTLI(BrotliLevel::default()),
3308        "zstd" => Compression::ZSTD(ZstdLevel::default()),
3309        _ => Compression::SNAPPY, // Default to snappy
3310    };
3311
3312    // Set up writer properties with optimized settings
3313    let props = WriterProperties::builder()
3314        .set_compression(compression)
3315        .set_max_row_group_size(target_row_group_size_bytes)
3316        .set_data_page_row_count_limit(20_000) // Optimal for predicate pushdown
3317        .set_data_page_size_limit(target_page_size_bytes)
3318        .set_write_batch_size(8192) // Control write granularity
3319        .build();
3320
3321    // Write to memory buffer
3322    let mut buffer = Vec::new();
3323    let mut cursor = Cursor::new(&mut buffer);
3324    let mut writer =
3325        ArrowWriter::try_new(&mut cursor, schema_ref.clone(), Some(props)).map_err(|e| {
3326            SofError::ParquetConversionError(format!("Failed to create Parquet writer: {}", e))
3327        })?;
3328
3329    // Process data in batches to handle large datasets efficiently
3330    let mut row_offset = 0;
3331    while row_offset < result.rows.len() {
3332        let batch_end = (row_offset + optimal_batch_size).min(result.rows.len());
3333        let batch_rows = &result.rows[row_offset..batch_end];
3334
3335        // Convert batch to Arrow arrays
3336        let batch_arrays =
3337            parquet_schema::process_to_arrow_arrays(&schema, &result.columns, batch_rows)?;
3338
3339        // Create RecordBatch for this chunk
3340        let batch = RecordBatch::try_new(schema_ref.clone(), batch_arrays).map_err(|e| {
3341            SofError::ParquetConversionError(format!(
3342                "Failed to create RecordBatch for rows {}-{}: {}",
3343                row_offset, batch_end, e
3344            ))
3345        })?;
3346
3347        // Write batch
3348        writer.write(&batch).map_err(|e| {
3349            SofError::ParquetConversionError(format!(
3350                "Failed to write RecordBatch for rows {}-{}: {}",
3351                row_offset, batch_end, e
3352            ))
3353        })?;
3354
3355        row_offset = batch_end;
3356    }
3357
3358    writer.close().map_err(|e| {
3359        SofError::ParquetConversionError(format!("Failed to close Parquet writer: {}", e))
3360    })?;
3361
3362    Ok(buffer)
3363}
3364
3365/// Format Parquet data with automatic file splitting when size exceeds limit
3366pub fn format_parquet_multi_file(
3367    result: ProcessedResult,
3368    options: Option<&ParquetOptions>,
3369    max_file_size_bytes: usize,
3370) -> Result<Vec<Vec<u8>>, SofError> {
3371    use arrow::record_batch::RecordBatch;
3372    use parquet::arrow::ArrowWriter;
3373    use parquet::basic::Compression;
3374    use parquet::file::properties::WriterProperties;
3375    use std::io::Cursor;
3376
3377    // Create Arrow schema from columns and sample data
3378    let schema = parquet_schema::create_arrow_schema(&result.columns, &result.rows)?;
3379    let schema_ref = std::sync::Arc::new(schema.clone());
3380
3381    // Get configuration from options or use defaults
3382    let parquet_opts = options.cloned().unwrap_or_default();
3383
3384    // Calculate optimal batch size
3385    let target_row_group_size_bytes = (parquet_opts.row_group_size_mb as usize) * 1024 * 1024;
3386    let target_page_size_bytes = (parquet_opts.page_size_kb as usize) * 1024;
3387    const TARGET_ROWS_PER_BATCH: usize = 100_000;
3388    const MAX_ROWS_PER_BATCH: usize = 500_000;
3389
3390    // Estimate average row size
3391    let sample_size = std::cmp::min(100, result.rows.len());
3392    let mut estimated_row_size = 100;
3393
3394    if sample_size > 0 {
3395        let sample_json_size: usize = result.rows[..sample_size]
3396            .iter()
3397            .map(|row| {
3398                row.values
3399                    .iter()
3400                    .filter_map(|v| v.as_ref())
3401                    .map(|v| v.to_string().len())
3402                    .sum::<usize>()
3403            })
3404            .sum();
3405        estimated_row_size = (sample_json_size / sample_size).max(50);
3406    }
3407
3408    let optimal_batch_size = (target_row_group_size_bytes / estimated_row_size)
3409        .clamp(TARGET_ROWS_PER_BATCH, MAX_ROWS_PER_BATCH);
3410
3411    // Parse compression algorithm
3412    use parquet::basic::BrotliLevel;
3413    use parquet::basic::GzipLevel;
3414    use parquet::basic::ZstdLevel;
3415
3416    let compression = match parquet_opts.compression.as_str() {
3417        "none" => Compression::UNCOMPRESSED,
3418        "gzip" => Compression::GZIP(GzipLevel::default()),
3419        "lz4" => Compression::LZ4,
3420        "brotli" => Compression::BROTLI(BrotliLevel::default()),
3421        "zstd" => Compression::ZSTD(ZstdLevel::default()),
3422        _ => Compression::SNAPPY,
3423    };
3424
3425    // Set up writer properties
3426    let props = WriterProperties::builder()
3427        .set_compression(compression)
3428        .set_max_row_group_size(target_row_group_size_bytes)
3429        .set_data_page_row_count_limit(20_000)
3430        .set_data_page_size_limit(target_page_size_bytes)
3431        .set_write_batch_size(8192)
3432        .build();
3433
3434    let mut file_buffers = Vec::new();
3435    let mut current_buffer = Vec::new();
3436    let mut current_cursor = Cursor::new(&mut current_buffer);
3437    let mut current_writer =
3438        ArrowWriter::try_new(&mut current_cursor, schema_ref.clone(), Some(props.clone()))
3439            .map_err(|e| {
3440                SofError::ParquetConversionError(format!("Failed to create Parquet writer: {}", e))
3441            })?;
3442
3443    let mut row_offset = 0;
3444    let mut _current_file_rows = 0;
3445
3446    while row_offset < result.rows.len() {
3447        let batch_end = (row_offset + optimal_batch_size).min(result.rows.len());
3448        let batch_rows = &result.rows[row_offset..batch_end];
3449
3450        // Convert batch to Arrow arrays
3451        let batch_arrays =
3452            parquet_schema::process_to_arrow_arrays(&schema, &result.columns, batch_rows)?;
3453
3454        // Create RecordBatch
3455        let batch = RecordBatch::try_new(schema_ref.clone(), batch_arrays).map_err(|e| {
3456            SofError::ParquetConversionError(format!(
3457                "Failed to create RecordBatch for rows {}-{}: {}",
3458                row_offset, batch_end, e
3459            ))
3460        })?;
3461
3462        // Write batch
3463        current_writer.write(&batch).map_err(|e| {
3464            SofError::ParquetConversionError(format!(
3465                "Failed to write RecordBatch for rows {}-{}: {}",
3466                row_offset, batch_end, e
3467            ))
3468        })?;
3469
3470        _current_file_rows += batch_end - row_offset;
3471        row_offset = batch_end;
3472
3473        // Check if we should start a new file
3474        // Get actual size of current buffer by flushing the writer
3475        let current_size = current_writer.bytes_written();
3476
3477        if current_size >= max_file_size_bytes && row_offset < result.rows.len() {
3478            // Close current file
3479            current_writer.close().map_err(|e| {
3480                SofError::ParquetConversionError(format!("Failed to close Parquet writer: {}", e))
3481            })?;
3482
3483            // Save the buffer
3484            file_buffers.push(current_buffer);
3485
3486            // Start new file
3487            current_buffer = Vec::new();
3488            current_cursor = Cursor::new(&mut current_buffer);
3489            current_writer =
3490                ArrowWriter::try_new(&mut current_cursor, schema_ref.clone(), Some(props.clone()))
3491                    .map_err(|e| {
3492                        SofError::ParquetConversionError(format!(
3493                            "Failed to create new Parquet writer: {}",
3494                            e
3495                        ))
3496                    })?;
3497            _current_file_rows = 0;
3498        }
3499    }
3500
3501    // Close the final writer
3502    current_writer.close().map_err(|e| {
3503        SofError::ParquetConversionError(format!("Failed to close final Parquet writer: {}", e))
3504    })?;
3505
3506    file_buffers.push(current_buffer);
3507
3508    Ok(file_buffers)
3509}