Skip to main content

cqlite_core/query/
result.rs

1//! Query result types for CQLite
2//!
3//! This module provides result types and utilities for query execution results.
4//! It includes result set management, row iteration, and result metadata.
5
6use crate::{schema::CqlType, RowKey, Value};
7use base64::Engine;
8use serde::{Deserialize, Serialize};
9use serde_json::json;
10use std::collections::HashMap;
11use std::fmt;
12use tokio::sync::mpsc;
13
14/// Encode bytes as standard base64 (used across JSON serializers below).
15fn b64(bytes: &[u8]) -> String {
16    base64::engine::general_purpose::STANDARD.encode(bytes)
17}
18
19/// True if `RowMetadata` carries anything worth serializing.
20fn row_metadata_is_populated(meta: &RowMetadata) -> bool {
21    meta.version.is_some() || meta.ttl.is_some() || !meta.tags.is_empty()
22}
23
24/// Query result containing rows and metadata
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct QueryResult {
27    /// Result rows
28    pub rows: Vec<QueryRow>,
29    /// Number of rows affected (for INSERT/UPDATE/DELETE)
30    pub rows_affected: u64,
31    /// Query execution time in milliseconds
32    pub execution_time_ms: u64,
33    /// Query metadata
34    pub metadata: QueryMetadata,
35}
36
37/// Individual row in query result
38#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct QueryRow {
40    /// Column values mapped by column name
41    pub values: HashMap<String, Value>,
42    /// Original row key
43    pub key: RowKey,
44    /// Row metadata
45    pub metadata: RowMetadata,
46}
47
48/// Metadata for query results
49#[derive(Debug, Clone, Default, Serialize, Deserialize)]
50pub struct QueryMetadata {
51    /// Column information
52    pub columns: Vec<ColumnInfo>,
53    /// Total row count (may be different from returned rows due to LIMIT)
54    pub total_rows: Option<u64>,
55    /// Query execution plan information
56    pub plan_info: Option<PlanInfo>,
57    /// Performance metrics
58    pub performance: PerformanceMetrics,
59    /// Warnings generated during execution
60    pub warnings: Vec<String>,
61}
62
63/// Information about a column in the result set
64#[derive(Debug, Clone, Serialize, Deserialize)]
65pub struct ColumnInfo {
66    /// Column name
67    pub name: String,
68    /// Column data type (flat, kept for backward compatibility)
69    pub data_type: crate::types::DataType,
70    /// Whether column can be null
71    pub nullable: bool,
72    /// Column position in result set
73    pub position: usize,
74    /// Original table name (for joined queries)
75    pub table_name: Option<String>,
76    /// Full schema-sourced CQL type (populated when a schema is available).
77    ///
78    /// This field expresses element types for collections (`list<int>`,
79    /// `map<text, bigint>`), and carries variants absent from the flat
80    /// `DataType` enum (`date`, `time`, `decimal`, `varint`, `counter`,
81    /// `duration`, `inet`). Downstream writers MUST use this over
82    /// `data_type` when it is `Some` — the no-heuristics mandate (Issue #28)
83    /// requires authoritative-schema metadata rather than runtime inference.
84    #[serde(skip_serializing_if = "Option::is_none")]
85    pub cql_type: Option<CqlType>,
86}
87
88/// Row metadata
89#[derive(Debug, Clone, Default, Serialize, Deserialize)]
90pub struct RowMetadata {
91    /// Row version/timestamp
92    pub version: Option<u64>,
93    /// Row TTL (time to live)
94    pub ttl: Option<u64>,
95    /// Row tags or labels
96    pub tags: HashMap<String, String>,
97}
98
99/// Query execution plan information
100#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct PlanInfo {
102    /// Plan type used
103    pub plan_type: String,
104    /// Estimated cost
105    pub estimated_cost: f64,
106    /// Actual cost
107    pub actual_cost: f64,
108    /// Indexes used
109    pub indexes_used: Vec<String>,
110    /// Steps executed
111    pub steps: Vec<String>,
112    /// Parallelization information
113    pub parallelization: Option<ParallelizationInfo>,
114}
115
116/// Parallelization information for query execution
117#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct ParallelizationInfo {
119    /// Number of threads used
120    pub threads_used: usize,
121    /// Whether parallelization was effective
122    pub effective: bool,
123    /// Partition information
124    pub partitions: Vec<PartitionInfo>,
125}
126
127/// Information about a partition processed in parallel
128#[derive(Debug, Clone, Serialize, Deserialize)]
129pub struct PartitionInfo {
130    /// Partition ID
131    pub id: usize,
132    /// Rows processed by this partition
133    pub rows_processed: u64,
134    /// Processing time for this partition
135    pub processing_time_ms: u64,
136}
137
138/// Performance metrics for query execution
139#[derive(Debug, Clone, Default, Serialize, Deserialize)]
140pub struct PerformanceMetrics {
141    /// Parse time in microseconds
142    pub parse_time_us: u64,
143    /// Planning time in microseconds
144    pub planning_time_us: u64,
145    /// Execution time in microseconds
146    pub execution_time_us: u64,
147    /// Total time in microseconds
148    pub total_time_us: u64,
149    /// Memory usage in bytes
150    pub memory_usage_bytes: u64,
151    /// I/O operations performed
152    pub io_operations: u64,
153    /// Cache hits
154    pub cache_hits: u64,
155    /// Cache misses
156    pub cache_misses: u64,
157}
158
159// ============================================================================
160// Streaming Query Results (Issue #280)
161// ============================================================================
162
163/// Configuration for streaming query results
164///
165/// Controls buffer sizes and chunk sizes for memory-efficient processing
166/// of large result sets.
167#[derive(Debug, Clone)]
168pub struct StreamingConfig {
169    /// Channel buffer size (controls backpressure)
170    /// Default: 1024 rows in flight
171    pub buffer_size: usize,
172    /// Chunk size hint for writers (rows per chunk)
173    /// Default: 10,000 rows (matches Parquet row group size)
174    pub chunk_size: usize,
175}
176
177impl Default for StreamingConfig {
178    fn default() -> Self {
179        Self {
180            buffer_size: 1024,  // 1K rows in flight
181            chunk_size: 10_000, // 10K rows per chunk (matches Parquet row group)
182        }
183    }
184}
185
186impl StreamingConfig {
187    /// Create a new streaming config with custom settings
188    pub fn new(buffer_size: usize, chunk_size: usize) -> Self {
189        Self {
190            buffer_size,
191            chunk_size,
192        }
193    }
194
195    /// Create a config optimized for Parquet output
196    pub fn for_parquet() -> Self {
197        Self {
198            buffer_size: 1024,
199            chunk_size: 10_000, // Row group size
200        }
201    }
202
203    /// Create a config optimized for CSV/JSON output
204    pub fn for_text_formats() -> Self {
205        Self {
206            buffer_size: 512,
207            chunk_size: 5_000, // Smaller chunks for text formats
208        }
209    }
210}
211
212/// Streaming query result iterator for memory-efficient processing
213///
214/// Instead of materializing all rows into a `Vec`, this iterator yields rows
215/// lazily via a channel, allowing processing of arbitrarily large result sets
216/// within the 128MB memory budget.
217///
218/// # Memory Budget
219///
220/// To stay within the 128MB target, callers MUST create a bounded channel
221/// with capacity from `StreamingConfig::buffer_size`. Assuming average row
222/// size of 1KB:
223/// - `buffer_size: 1024` = ~1MB in flight
224/// - `chunk_size: 10_000` = ~10MB per chunk
225/// - Total peak usage: ~11MB (well within 128MB budget)
226///
227/// For rows with large blobs/text, reduce buffer sizes proportionally.
228///
229/// # Contract
230///
231/// 1. The caller MUST create a bounded channel with `mpsc::channel(config.buffer_size)`
232/// 2. The iterator does NOT own the sender; the caller must spawn a task to send rows
233/// 3. The iterator is consumed once; create a new one for subsequent queries
234///
235/// # Example
236///
237/// ```ignore
238/// let config = StreamingConfig::default();
239/// let (tx, rx) = tokio::sync::mpsc::channel(config.buffer_size);
240///
241/// // Spawn producer
242/// tokio::spawn(async move {
243///     for row in rows {
244///         if tx.send(Ok(row)).await.is_err() {
245///             break; // Consumer dropped
246///         }
247///     }
248/// });
249///
250/// // Create iterator from receiver
251/// let mut iterator = QueryResultIterator::new(rx, metadata);
252///
253/// while let Some(row_result) = iterator.next_async().await {
254///     let row = row_result?;
255///     writer.write_row(&row)?;
256/// }
257/// ```
258pub struct QueryResultIterator {
259    /// Channel receiver for rows
260    receiver: mpsc::Receiver<Result<QueryRow, crate::Error>>,
261    /// Query metadata (columns, etc.)
262    pub metadata: QueryMetadata,
263    /// Total rows hint (if known from query planning)
264    pub total_rows_hint: Option<u64>,
265    /// Count of rows received so far
266    rows_received: u64,
267}
268
269impl QueryResultIterator {
270    /// Create a new streaming result iterator
271    pub fn new(
272        receiver: mpsc::Receiver<Result<QueryRow, crate::Error>>,
273        metadata: QueryMetadata,
274    ) -> Self {
275        Self {
276            receiver,
277            metadata,
278            total_rows_hint: None,
279            rows_received: 0,
280        }
281    }
282
283    /// Create with a known total row count hint
284    pub fn with_total_hint(mut self, total: u64) -> Self {
285        self.total_rows_hint = Some(total);
286        self
287    }
288
289    /// Receive next row (async)
290    ///
291    /// Returns `None` when all rows have been received.
292    pub async fn next_async(&mut self) -> Option<Result<QueryRow, crate::Error>> {
293        let result = self.receiver.recv().await?;
294        if result.is_ok() {
295            self.rows_received += 1;
296        }
297        Some(result)
298    }
299
300    /// Maximum allowed chunk size to prevent OOM
301    const MAX_CHUNK_SIZE: usize = 100_000;
302
303    /// Collect into chunks of specified size
304    ///
305    /// Returns a chunk of rows up to `size`. May return fewer rows if the
306    /// stream ends or an error occurs.
307    ///
308    /// # Arguments
309    ///
310    /// * `size` - Maximum number of rows to collect. Limited to MAX_CHUNK_SIZE
311    ///   (100,000) to prevent unbounded memory allocation.
312    ///
313    /// # Returns
314    ///
315    /// A vector of rows, which may be smaller than `size` if the stream ends
316    /// or an error occurs.
317    pub async fn collect_chunk(&mut self, size: usize) -> Result<Vec<QueryRow>, crate::Error> {
318        let safe_size = size.min(Self::MAX_CHUNK_SIZE);
319        // Grow the Vec lazily; a requested `safe_size` is only an upper bound.
320        let mut chunk = Vec::new();
321        while chunk.len() < safe_size {
322            match self.receiver.recv().await {
323                Some(Ok(row)) => {
324                    self.rows_received += 1;
325                    chunk.push(row);
326                }
327                Some(Err(e)) => return Err(e),
328                None => break,
329            }
330        }
331        Ok(chunk)
332    }
333
334    /// Get count of rows received so far
335    pub fn rows_received(&self) -> u64 {
336        self.rows_received
337    }
338
339    /// Get progress as a percentage (if total is known)
340    pub fn progress_percent(&self) -> Option<f64> {
341        self.total_rows_hint.map(|total| {
342            if total == 0 {
343                100.0
344            } else {
345                (self.rows_received as f64 / total as f64) * 100.0
346            }
347        })
348    }
349}
350
351impl QueryResult {
352    /// Create a new empty query result
353    pub fn new() -> Self {
354        Self {
355            rows: Vec::new(),
356            rows_affected: 0,
357            execution_time_ms: 0,
358            metadata: QueryMetadata::default(),
359        }
360    }
361
362    /// Create a result with rows
363    pub fn with_rows(rows: Vec<QueryRow>) -> Self {
364        Self {
365            rows,
366            ..Self::new()
367        }
368    }
369
370    /// Create a result for DML operations (INSERT/UPDATE/DELETE)
371    pub fn with_affected_rows(rows_affected: u64) -> Self {
372        Self {
373            rows_affected,
374            ..Self::new()
375        }
376    }
377
378    /// Get the number of rows in the result
379    pub fn row_count(&self) -> usize {
380        self.rows.len()
381    }
382
383    /// Check if the result is empty
384    pub fn is_empty(&self) -> bool {
385        self.rows.is_empty()
386    }
387
388    /// Get a specific row by index
389    pub fn get_row(&self, index: usize) -> Option<&QueryRow> {
390        self.rows.get(index)
391    }
392
393    /// Get column information
394    pub fn columns(&self) -> &[ColumnInfo] {
395        &self.metadata.columns
396    }
397
398    /// Get column names
399    pub fn column_names(&self) -> Vec<String> {
400        self.metadata
401            .columns
402            .iter()
403            .map(|c| c.name.clone())
404            .collect()
405    }
406
407    /// Get execution time in milliseconds
408    pub fn execution_time(&self) -> u64 {
409        self.execution_time_ms
410    }
411
412    /// Get performance metrics
413    pub fn performance(&self) -> &PerformanceMetrics {
414        &self.metadata.performance
415    }
416
417    /// Get warnings
418    pub fn warnings(&self) -> &[String] {
419        &self.metadata.warnings
420    }
421
422    /// Add a warning
423    pub fn add_warning(&mut self, warning: String) {
424        self.metadata.warnings.push(warning);
425    }
426
427    /// Convert to JSON representation
428    ///
429    /// Note: `execution_time_ms` is intentionally excluded to keep snapshot
430    /// output deterministic; use `execution_time()` to read it separately.
431    pub fn to_json(&self) -> serde_json::Value {
432        let rows: Vec<_> = self
433            .rows
434            .iter()
435            .map(|row| self.row_to_json_deterministic(row))
436            .collect();
437        let columns: Vec<_> = self
438            .metadata
439            .columns
440            .iter()
441            .map(ColumnInfo::to_json)
442            .collect();
443        let warnings: Vec<_> = self
444            .metadata
445            .warnings
446            .iter()
447            .cloned()
448            .map(serde_json::Value::String)
449            .collect();
450
451        json!({
452            "rows": rows,
453            "rows_affected": self.rows_affected,
454            "row_count": self.rows.len(),
455            "columns": columns,
456            "performance": self.metadata.performance.to_json(),
457            "warnings": warnings,
458        })
459    }
460
461    /// Create result iterator
462    pub fn iter(&self) -> std::slice::Iter<'_, QueryRow> {
463        self.rows.iter()
464    }
465
466    /// Convert a single row to JSON with deterministic field ordering.
467    ///
468    /// When `metadata.columns` is populated, fields appear in that order; otherwise
469    /// HashMap keys are emitted in sorted order so snapshots stay stable.
470    fn row_to_json_deterministic(&self, row: &QueryRow) -> serde_json::Value {
471        let mut result = serde_json::Map::new();
472
473        if !self.metadata.columns.is_empty() {
474            for col in &self.metadata.columns {
475                let value_json = row
476                    .values
477                    .get(&col.name)
478                    .map_or(serde_json::Value::Null, ToJson::to_json);
479                result.insert(col.name.clone(), value_json);
480            }
481        } else {
482            let mut sorted_keys: Vec<&String> = row.values.keys().collect();
483            sorted_keys.sort();
484            for key in sorted_keys {
485                if let Some(value) = row.values.get(key) {
486                    result.insert(key.clone(), value.to_json());
487                }
488            }
489        }
490
491        result.insert(
492            "_key".to_string(),
493            serde_json::Value::String(format!("{:?}", row.key)),
494        );
495
496        if row_metadata_is_populated(&row.metadata) {
497            result.insert("_metadata".to_string(), row.metadata.to_json());
498        }
499
500        serde_json::Value::Object(result)
501    }
502}
503
504impl QueryRow {
505    /// Create a new query row
506    pub fn new(key: RowKey) -> Self {
507        Self {
508            values: HashMap::new(),
509            key,
510            metadata: RowMetadata::default(),
511        }
512    }
513
514    /// Create a row with values
515    pub fn with_values(key: RowKey, values: HashMap<String, Value>) -> Self {
516        Self {
517            values,
518            key,
519            metadata: RowMetadata::default(),
520        }
521    }
522
523    /// Get a value by column name
524    pub fn get(&self, column: &str) -> Option<&Value> {
525        self.values.get(column)
526    }
527
528    /// Set a value for a column
529    pub fn set(&mut self, column: String, value: Value) {
530        self.values.insert(column, value);
531    }
532
533    /// Get all column names
534    pub fn column_names(&self) -> Vec<String> {
535        self.values.keys().cloned().collect()
536    }
537
538    /// Get the row key
539    pub fn key(&self) -> &RowKey {
540        &self.key
541    }
542
543    /// Get row metadata
544    pub fn metadata(&self) -> &RowMetadata {
545        &self.metadata
546    }
547
548    /// Set row metadata
549    pub fn set_metadata(&mut self, metadata: RowMetadata) {
550        self.metadata = metadata;
551    }
552
553    /// Convert to JSON representation
554    pub fn to_json(&self) -> serde_json::Value {
555        let mut result = serde_json::Map::new();
556
557        for (column, value) in &self.values {
558            result.insert(column.clone(), value.to_json());
559        }
560
561        result.insert(
562            "_key".to_string(),
563            serde_json::Value::String(format!("{:?}", self.key)),
564        );
565
566        if row_metadata_is_populated(&self.metadata) {
567            result.insert("_metadata".to_string(), self.metadata.to_json());
568        }
569
570        serde_json::Value::Object(result)
571    }
572}
573
574impl ColumnInfo {
575    /// Create new column info
576    pub fn new(
577        name: String,
578        data_type: crate::types::DataType,
579        nullable: bool,
580        position: usize,
581    ) -> Self {
582        Self {
583            name,
584            data_type,
585            nullable,
586            position,
587            table_name: None,
588            cql_type: None,
589        }
590    }
591
592    /// Set table name
593    pub fn with_table_name(mut self, table_name: String) -> Self {
594        self.table_name = Some(table_name);
595        self
596    }
597
598    /// Attach a schema-sourced [`CqlType`] to this column.
599    ///
600    /// The `data_type` field is left unchanged so existing consumers remain
601    /// unaffected; downstream writers that need full type fidelity (e.g. the
602    /// Parquet/Arrow writer) should prefer `cql_type` when it is `Some`.
603    pub fn with_cql_type(mut self, cql_type: CqlType) -> Self {
604        self.cql_type = Some(cql_type);
605        self
606    }
607
608    /// Convert to JSON representation
609    ///
610    /// The `data_type` and all pre-existing keys are preserved unchanged for
611    /// backward compatibility. The new `cql_type` key is only emitted when
612    /// the field is `Some` (it is marked `skip_serializing_if` in the struct
613    /// derive, but we also reflect it here for the hand-rolled JSON path).
614    pub fn to_json(&self) -> serde_json::Value {
615        let mut map = serde_json::Map::new();
616        map.insert("name".to_string(), json!(self.name));
617        map.insert(
618            "data_type".to_string(),
619            json!(format!("{:?}", self.data_type)),
620        );
621        map.insert("nullable".to_string(), json!(self.nullable));
622        map.insert("position".to_string(), json!(self.position));
623        if let Some(table_name) = &self.table_name {
624            map.insert("table_name".to_string(), json!(table_name));
625        }
626        // New additive key: only present when schema CqlType is known.
627        if let Some(cql_type) = &self.cql_type {
628            map.insert("cql_type".to_string(), json!(format_cql_type(cql_type)));
629        }
630        serde_json::Value::Object(map)
631    }
632}
633
634/// Map a schema-level [`CqlType`] to the flat [`crate::types::DataType`] enum.
635///
636/// This is used in the `SELECT *` path and explicit projection paths to derive
637/// a backward-compatible `data_type` from authoritative schema metadata rather
638/// than hard-coding `DataType::Text` (Issue #674 / no-heuristics mandate #28).
639pub fn cql_type_to_data_type(cql_type: &CqlType) -> crate::types::DataType {
640    use crate::types::DataType;
641    match cql_type {
642        CqlType::Boolean => DataType::Boolean,
643        CqlType::TinyInt => DataType::TinyInt,
644        CqlType::SmallInt => DataType::SmallInt,
645        CqlType::Int => DataType::Integer,
646        CqlType::BigInt | CqlType::Varint | CqlType::Counter => DataType::BigInt,
647        CqlType::Float => DataType::Float32,
648        CqlType::Double | CqlType::Decimal => DataType::Float,
649        CqlType::Text | CqlType::Varchar | CqlType::Ascii => DataType::Text,
650        CqlType::Blob => DataType::Blob,
651        CqlType::Timestamp => DataType::Timestamp,
652        CqlType::Date | CqlType::Time | CqlType::Duration | CqlType::Inet => DataType::BigInt,
653        CqlType::Uuid | CqlType::TimeUuid => DataType::Uuid,
654        CqlType::List(_) => DataType::List,
655        CqlType::Set(_) => DataType::Set,
656        CqlType::Map(_, _) => DataType::Map,
657        CqlType::Tuple(_) => DataType::Tuple,
658        CqlType::Udt(_, _) => DataType::Udt,
659        CqlType::Frozen(inner) => cql_type_to_data_type(inner),
660        CqlType::Custom(_) => DataType::Blob,
661    }
662}
663
664/// Format a [`CqlType`] as a human-readable CQL type string.
665///
666/// Used for the `cql_type` field in `ColumnInfo::to_json`.
667fn format_cql_type(cql_type: &CqlType) -> String {
668    match cql_type {
669        CqlType::Boolean => "boolean".to_string(),
670        CqlType::TinyInt => "tinyint".to_string(),
671        CqlType::SmallInt => "smallint".to_string(),
672        CqlType::Int => "int".to_string(),
673        CqlType::BigInt => "bigint".to_string(),
674        CqlType::Counter => "counter".to_string(),
675        CqlType::Float => "float".to_string(),
676        CqlType::Double => "double".to_string(),
677        CqlType::Decimal => "decimal".to_string(),
678        CqlType::Text => "text".to_string(),
679        CqlType::Varchar => "varchar".to_string(),
680        CqlType::Ascii => "ascii".to_string(),
681        CqlType::Blob => "blob".to_string(),
682        CqlType::Timestamp => "timestamp".to_string(),
683        CqlType::Date => "date".to_string(),
684        CqlType::Time => "time".to_string(),
685        CqlType::Uuid => "uuid".to_string(),
686        CqlType::TimeUuid => "timeuuid".to_string(),
687        CqlType::Inet => "inet".to_string(),
688        CqlType::Duration => "duration".to_string(),
689        CqlType::Varint => "varint".to_string(),
690        CqlType::List(inner) => format!("list<{}>", format_cql_type(inner)),
691        CqlType::Set(inner) => format!("set<{}>", format_cql_type(inner)),
692        CqlType::Map(k, v) => format!("map<{}, {}>", format_cql_type(k), format_cql_type(v)),
693        CqlType::Tuple(types) => {
694            let inner: Vec<_> = types.iter().map(format_cql_type).collect();
695            format!("tuple<{}>", inner.join(", "))
696        }
697        CqlType::Udt(name, _) => name.clone(),
698        CqlType::Frozen(inner) => format!("frozen<{}>", format_cql_type(inner)),
699        CqlType::Custom(name) => name.clone(),
700    }
701}
702
703impl RowMetadata {
704    /// Create new row metadata
705    pub fn new() -> Self {
706        Self::default()
707    }
708
709    /// Set version
710    pub fn with_version(mut self, version: u64) -> Self {
711        self.version = Some(version);
712        self
713    }
714
715    /// Set TTL
716    pub fn with_ttl(mut self, ttl: u64) -> Self {
717        self.ttl = Some(ttl);
718        self
719    }
720
721    /// Add tag
722    pub fn with_tag(mut self, key: String, value: String) -> Self {
723        self.tags.insert(key, value);
724        self
725    }
726
727    /// Convert to JSON representation
728    pub fn to_json(&self) -> serde_json::Value {
729        let mut map = serde_json::Map::new();
730        if let Some(version) = self.version {
731            map.insert("version".to_string(), json!(version));
732        }
733        if let Some(ttl) = self.ttl {
734            map.insert("ttl".to_string(), json!(ttl));
735        }
736        if !self.tags.is_empty() {
737            map.insert("tags".to_string(), json!(self.tags));
738        }
739        serde_json::Value::Object(map)
740    }
741}
742
743impl PerformanceMetrics {
744    /// Create new performance metrics
745    pub fn new() -> Self {
746        Self::default()
747    }
748
749    /// Get total time in milliseconds
750    pub fn total_time_ms(&self) -> u64 {
751        self.total_time_us / 1000
752    }
753
754    /// Get cache hit ratio
755    pub fn cache_hit_ratio(&self) -> f64 {
756        let total = self.cache_hits + self.cache_misses;
757        if total == 0 {
758            0.0
759        } else {
760            self.cache_hits as f64 / total as f64
761        }
762    }
763
764    /// Convert to JSON representation
765    pub fn to_json(&self) -> serde_json::Value {
766        let cache_hit_ratio = serde_json::Number::from_f64(self.cache_hit_ratio())
767            .map(serde_json::Value::Number)
768            .unwrap_or(json!(0));
769        json!({
770            "parse_time_us": self.parse_time_us,
771            "planning_time_us": self.planning_time_us,
772            "execution_time_us": self.execution_time_us,
773            "total_time_us": self.total_time_us,
774            "memory_usage_bytes": self.memory_usage_bytes,
775            "io_operations": self.io_operations,
776            "cache_hits": self.cache_hits,
777            "cache_misses": self.cache_misses,
778            "cache_hit_ratio": cache_hit_ratio,
779        })
780    }
781}
782
783/// Write one horizontal border row using the supplied left/middle/right glyphs.
784fn write_border(
785    f: &mut fmt::Formatter<'_>,
786    widths: &[usize],
787    left: char,
788    sep: char,
789    right: char,
790) -> fmt::Result {
791    write!(f, "{}", left)?;
792    for (i, width) in widths.iter().enumerate() {
793        write!(f, "{}", "─".repeat(width + 2))?;
794        if i < widths.len() - 1 {
795            write!(f, "{}", sep)?;
796        }
797    }
798    writeln!(f, "{}", right)
799}
800
801impl fmt::Display for QueryResult {
802    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
803        if self.rows.is_empty() {
804            return write!(f, "Empty result set ({} rows affected)", self.rows_affected);
805        }
806
807        let column_names = self.column_names();
808        if column_names.is_empty() {
809            return write!(f, "No columns in result set");
810        }
811
812        // Column widths = max(header, longest value) for each column.
813        let col_widths: Vec<usize> = column_names
814            .iter()
815            .map(|col_name| {
816                self.rows
817                    .iter()
818                    .filter_map(|row| row.values.get(col_name))
819                    .map(|v| format!("{}", v).len())
820                    .max()
821                    .unwrap_or(0)
822                    .max(col_name.len())
823            })
824            .collect();
825
826        write_border(f, &col_widths, '┌', '┬', '┐')?;
827
828        write!(f, "│")?;
829        for (i, (col_name, width)) in column_names.iter().zip(col_widths.iter()).enumerate() {
830            write!(f, " {:width$} ", col_name, width = width)?;
831            if i < column_names.len() - 1 {
832                write!(f, "│")?;
833            }
834        }
835        writeln!(f, "│")?;
836
837        write_border(f, &col_widths, '├', '┼', '┤')?;
838
839        for row in &self.rows {
840            write!(f, "│")?;
841            for (i, (col_name, width)) in column_names.iter().zip(col_widths.iter()).enumerate() {
842                let value = row
843                    .values
844                    .get(col_name)
845                    .map(|v| format!("{}", v))
846                    .unwrap_or_else(|| "NULL".to_string());
847                write!(f, " {:width$} ", value, width = width)?;
848                if i < column_names.len() - 1 {
849                    write!(f, "│")?;
850                }
851            }
852            writeln!(f, "│")?;
853        }
854
855        write_border(f, &col_widths, '└', '┴', '┘')?;
856
857        writeln!(
858            f,
859            "{} rows returned in {}ms",
860            self.rows.len(),
861            self.execution_time_ms
862        )?;
863
864        if !self.metadata.warnings.is_empty() {
865            writeln!(f, "\nWarnings:")?;
866            for warning in &self.metadata.warnings {
867                writeln!(f, "  - {}", warning)?;
868            }
869        }
870
871        Ok(())
872    }
873}
874
875impl Default for QueryResult {
876    fn default() -> Self {
877        Self::new()
878    }
879}
880
881impl IntoIterator for QueryResult {
882    type Item = QueryRow;
883    type IntoIter = std::vec::IntoIter<QueryRow>;
884
885    fn into_iter(self) -> Self::IntoIter {
886        self.rows.into_iter()
887    }
888}
889
890impl<'a> IntoIterator for &'a QueryResult {
891    type Item = &'a QueryRow;
892    type IntoIter = std::slice::Iter<'a, QueryRow>;
893
894    fn into_iter(self) -> Self::IntoIter {
895        self.rows.iter()
896    }
897}
898
899// Helper trait for converting values to JSON
900trait ToJson {
901    fn to_json(&self) -> serde_json::Value;
902}
903
904impl ToJson for Value {
905    fn to_json(&self) -> serde_json::Value {
906        // Non-finite floats have no JSON representation; we emit null for those.
907        fn float_to_json(x: f64) -> serde_json::Value {
908            serde_json::Number::from_f64(x)
909                .map(serde_json::Value::Number)
910                .unwrap_or(serde_json::Value::Null)
911        }
912
913        match self {
914            Value::Null => serde_json::Value::Null,
915            Value::Boolean(b) => json!(*b),
916            Value::Integer(i) => json!(*i),
917            Value::BigInt(i) => json!(*i),
918            Value::Counter(c) => json!(*c),
919            Value::TinyInt(i) => json!(*i as i64),
920            Value::SmallInt(i) => json!(*i as i64),
921            Value::Date(d) => json!(*d),
922            Value::Time(t) => json!(*t),
923            Value::Timestamp(ts) => json!(*ts),
924            Value::Float(f) => float_to_json(*f),
925            Value::Float32(f) => float_to_json(*f as f64),
926            Value::Text(s) => json!(s),
927            Value::Json(value) => value.clone(),
928            Value::Blob(bytes) | Value::Varint(bytes) | Value::Inet(bytes) => json!(b64(bytes)),
929            Value::Uuid(uuid) => json!(b64(uuid)),
930            Value::List(items) | Value::Set(items) | Value::Tuple(items) => {
931                let json_list: Vec<_> = items.iter().map(ToJson::to_json).collect();
932                serde_json::Value::Array(json_list)
933            }
934            Value::Map(entries) => {
935                let json_map: serde_json::Map<String, serde_json::Value> = entries
936                    .iter()
937                    .map(|(k, v)| (format!("{}", k), v.to_json()))
938                    .collect();
939                serde_json::Value::Object(json_map)
940            }
941            Value::Udt(udt) => {
942                let mut json_obj = serde_json::Map::new();
943                json_obj.insert("_type".to_string(), json!(udt.type_name));
944                for field in &udt.fields {
945                    let field_json = field
946                        .value
947                        .as_ref()
948                        .map_or(serde_json::Value::Null, ToJson::to_json);
949                    json_obj.insert(field.name.clone(), field_json);
950                }
951                serde_json::Value::Object(json_obj)
952            }
953            Value::Frozen(boxed) => boxed.to_json(),
954            Value::Decimal { scale, unscaled } => json!({
955                "scale": *scale,
956                "unscaled": b64(unscaled),
957            }),
958            Value::Duration {
959                months,
960                days,
961                nanos,
962            } => json!({
963                "months": *months,
964                "days": *days,
965                "nanos": *nanos,
966            }),
967            Value::Tombstone(info) => {
968                let mut json_obj = serde_json::Map::new();
969                json_obj.insert("type".to_string(), json!("tombstone"));
970                json_obj.insert("deletion_time".to_string(), json!(info.deletion_time));
971                json_obj.insert(
972                    "tombstone_type".to_string(),
973                    json!(format!("{:?}", info.tombstone_type)),
974                );
975                if let Some(ttl) = info.ttl {
976                    json_obj.insert("ttl".to_string(), json!(ttl));
977                }
978                serde_json::Value::Object(json_obj)
979            }
980        }
981    }
982}
983
984#[cfg(test)]
985mod tests {
986    use super::*;
987    use crate::Value;
988
989    #[test]
990    fn test_query_result_creation() {
991        let result = QueryResult::new();
992        assert!(result.is_empty());
993        assert_eq!(result.row_count(), 0);
994        assert_eq!(result.execution_time(), 0);
995    }
996
997    #[test]
998    fn test_query_result_with_rows() {
999        let mut row1 = QueryRow::new(RowKey::new(vec![1]));
1000        row1.set("id".to_string(), Value::Integer(1));
1001        row1.set("name".to_string(), Value::Text("Alice".to_string()));
1002
1003        let mut row2 = QueryRow::new(RowKey::new(vec![2]));
1004        row2.set("id".to_string(), Value::Integer(2));
1005        row2.set("name".to_string(), Value::Text("Bob".to_string()));
1006
1007        let result = QueryResult::with_rows(vec![row1, row2]);
1008        assert_eq!(result.row_count(), 2);
1009        assert!(!result.is_empty());
1010
1011        let first_row = result.get_row(0).unwrap();
1012        assert_eq!(first_row.get("id"), Some(&Value::Integer(1)));
1013        assert_eq!(
1014            first_row.get("name"),
1015            Some(&Value::Text("Alice".to_string()))
1016        );
1017    }
1018
1019    #[test]
1020    fn test_query_row_operations() {
1021        let mut row = QueryRow::new(RowKey::new(vec![1]));
1022        row.set("id".to_string(), Value::Integer(42));
1023        row.set("active".to_string(), Value::Boolean(true));
1024
1025        assert_eq!(row.get("id"), Some(&Value::Integer(42)));
1026        assert_eq!(row.get("active"), Some(&Value::Boolean(true)));
1027        assert_eq!(row.get("nonexistent"), None);
1028
1029        let column_names = row.column_names();
1030        assert_eq!(column_names.len(), 2);
1031        assert!(column_names.contains(&"id".to_string()));
1032        assert!(column_names.contains(&"active".to_string()));
1033    }
1034
1035    #[test]
1036    fn test_column_info() {
1037        let column = ColumnInfo::new(
1038            "user_id".to_string(),
1039            crate::types::DataType::Integer,
1040            false,
1041            0,
1042        )
1043        .with_table_name("users".to_string());
1044
1045        assert_eq!(column.name, "user_id");
1046        assert_eq!(column.data_type, crate::types::DataType::Integer);
1047        assert!(!column.nullable);
1048        assert_eq!(column.position, 0);
1049        assert_eq!(column.table_name, Some("users".to_string()));
1050        assert!(column.cql_type.is_none());
1051    }
1052
1053    #[test]
1054    fn test_column_info_with_cql_type_scalar() {
1055        use crate::schema::CqlType;
1056        let column = ColumnInfo::new("ts".to_string(), crate::types::DataType::Timestamp, true, 1)
1057            .with_cql_type(CqlType::Timestamp);
1058
1059        assert_eq!(column.name, "ts");
1060        assert_eq!(column.cql_type, Some(CqlType::Timestamp));
1061        // data_type is unaffected
1062        assert_eq!(column.data_type, crate::types::DataType::Timestamp);
1063    }
1064
1065    #[test]
1066    fn test_column_info_with_cql_type_list() {
1067        use crate::schema::CqlType;
1068        let list_type = CqlType::List(Box::new(CqlType::Int));
1069        let column = ColumnInfo::new("items".to_string(), crate::types::DataType::List, true, 2)
1070            .with_cql_type(list_type.clone());
1071
1072        assert_eq!(column.cql_type, Some(list_type));
1073    }
1074
1075    #[test]
1076    fn test_column_info_with_cql_type_map() {
1077        use crate::schema::CqlType;
1078        let map_type = CqlType::Map(Box::new(CqlType::Text), Box::new(CqlType::BigInt));
1079        let column = ColumnInfo::new("props".to_string(), crate::types::DataType::Map, true, 3)
1080            .with_cql_type(map_type.clone());
1081
1082        assert_eq!(column.cql_type, Some(map_type));
1083    }
1084
1085    #[test]
1086    fn test_column_info_with_cql_type_udt() {
1087        use crate::schema::CqlType;
1088        let udt_type = CqlType::Udt("address".to_string(), vec![]);
1089        let column = ColumnInfo::new("addr".to_string(), crate::types::DataType::Udt, true, 4)
1090            .with_cql_type(udt_type.clone());
1091
1092        assert_eq!(column.cql_type, Some(udt_type));
1093    }
1094
1095    #[test]
1096    fn test_cql_type_to_data_type_scalars() {
1097        use super::cql_type_to_data_type;
1098        use crate::schema::CqlType;
1099        use crate::types::DataType;
1100
1101        assert_eq!(cql_type_to_data_type(&CqlType::Boolean), DataType::Boolean);
1102        assert_eq!(cql_type_to_data_type(&CqlType::Int), DataType::Integer);
1103        assert_eq!(cql_type_to_data_type(&CqlType::BigInt), DataType::BigInt);
1104        assert_eq!(cql_type_to_data_type(&CqlType::Text), DataType::Text);
1105        assert_eq!(cql_type_to_data_type(&CqlType::Blob), DataType::Blob);
1106        assert_eq!(cql_type_to_data_type(&CqlType::Uuid), DataType::Uuid);
1107        assert_eq!(
1108            cql_type_to_data_type(&CqlType::Timestamp),
1109            DataType::Timestamp
1110        );
1111    }
1112
1113    #[test]
1114    fn test_cql_type_to_data_type_collections() {
1115        use super::cql_type_to_data_type;
1116        use crate::schema::CqlType;
1117        use crate::types::DataType;
1118
1119        assert_eq!(
1120            cql_type_to_data_type(&CqlType::List(Box::new(CqlType::Int))),
1121            DataType::List
1122        );
1123        assert_eq!(
1124            cql_type_to_data_type(&CqlType::Set(Box::new(CqlType::Text))),
1125            DataType::Set
1126        );
1127        assert_eq!(
1128            cql_type_to_data_type(&CqlType::Map(
1129                Box::new(CqlType::Text),
1130                Box::new(CqlType::BigInt)
1131            )),
1132            DataType::Map
1133        );
1134    }
1135
1136    #[test]
1137    fn test_cql_type_to_data_type_frozen() {
1138        use super::cql_type_to_data_type;
1139        use crate::schema::CqlType;
1140        use crate::types::DataType;
1141
1142        // frozen<list<int>> should unwrap to DataType::List
1143        assert_eq!(
1144            cql_type_to_data_type(&CqlType::Frozen(Box::new(CqlType::List(Box::new(
1145                CqlType::Int
1146            ))))),
1147            DataType::List
1148        );
1149    }
1150
1151    #[test]
1152    fn test_column_info_to_json_includes_cql_type() {
1153        use crate::schema::CqlType;
1154        let column = ColumnInfo::new("items".to_string(), crate::types::DataType::List, true, 0)
1155            .with_cql_type(CqlType::List(Box::new(CqlType::Int)));
1156
1157        let json = column.to_json();
1158        let obj = json.as_object().unwrap();
1159
1160        // Existing keys unchanged
1161        assert_eq!(obj["name"], "items");
1162        assert!(obj.contains_key("data_type"));
1163        assert!(obj.contains_key("nullable"));
1164        assert!(obj.contains_key("position"));
1165
1166        // New key present
1167        assert!(obj.contains_key("cql_type"));
1168        assert_eq!(obj["cql_type"], "list<int>");
1169    }
1170
1171    #[test]
1172    fn test_column_info_to_json_no_cql_type() {
1173        // Without cql_type, the JSON must not include the key
1174        let column = ColumnInfo::new("id".to_string(), crate::types::DataType::Integer, false, 0);
1175
1176        let json = column.to_json();
1177        let obj = json.as_object().unwrap();
1178
1179        assert!(!obj.contains_key("cql_type"));
1180    }
1181
1182    #[test]
1183    fn test_row_metadata() {
1184        let metadata = RowMetadata::new()
1185            .with_version(123)
1186            .with_ttl(3600)
1187            .with_tag("source".to_string(), "import".to_string());
1188
1189        assert_eq!(metadata.version, Some(123));
1190        assert_eq!(metadata.ttl, Some(3600));
1191        assert_eq!(metadata.tags.get("source"), Some(&"import".to_string()));
1192    }
1193
1194    #[test]
1195    fn test_performance_metrics() {
1196        let mut metrics = PerformanceMetrics::new();
1197        metrics.cache_hits = 8;
1198        metrics.cache_misses = 2;
1199        metrics.total_time_us = 5000;
1200
1201        assert_eq!(metrics.cache_hit_ratio(), 0.8);
1202        assert_eq!(metrics.total_time_ms(), 5);
1203    }
1204
1205    #[test]
1206    fn test_json_serialization() {
1207        let mut row = QueryRow::new(RowKey::new(vec![1]));
1208        row.set("id".to_string(), Value::Integer(1));
1209        row.set("name".to_string(), Value::Text("test".to_string()));
1210
1211        let json = row.to_json();
1212        assert!(json.is_object());
1213
1214        let obj = json.as_object().unwrap();
1215        assert_eq!(obj.get("id"), Some(&serde_json::Value::Number(1.into())));
1216        assert_eq!(
1217            obj.get("name"),
1218            Some(&serde_json::Value::String("test".to_string()))
1219        );
1220    }
1221
1222    #[test]
1223    fn test_result_iteration() {
1224        let row1 = QueryRow::new(RowKey::new(vec![1]));
1225        let row2 = QueryRow::new(RowKey::new(vec![2]));
1226        let result = QueryResult::with_rows(vec![row1, row2]);
1227
1228        let mut count = 0;
1229        for _row in &result {
1230            count += 1;
1231        }
1232        assert_eq!(count, 2);
1233
1234        let mut count = 0;
1235        for _row in result {
1236            count += 1;
1237        }
1238        assert_eq!(count, 2);
1239    }
1240}