grafeo-core 0.5.41

Core graph models, indexes, and execution primitives for Grafeo
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
//! Collecting and storing graph statistics.
//!
//! Use [`StatisticsCollector`] to stream values through and build statistics,
//! or construct [`ColumnStatistics`] directly if you already know the numbers.
//! The [`Statistics`] struct holds everything the optimizer needs.

use super::histogram::Histogram;
use grafeo_common::types::Value;
use std::collections::HashMap;

/// A property key identifier.
pub type PropertyKey = String;

/// Everything the optimizer knows about the data - cardinalities, distributions, degrees.
///
/// This is the main struct the query planner consults when choosing between
/// different execution strategies.
#[derive(Debug, Clone, Default)]
pub struct Statistics {
    /// Per-label statistics.
    pub labels: HashMap<String, LabelStatistics>,
    /// Per-edge-type statistics.
    pub edge_types: HashMap<String, EdgeTypeStatistics>,
    /// Per-property statistics (across all labels).
    pub properties: HashMap<PropertyKey, ColumnStatistics>,
    /// Total node count.
    pub total_nodes: u64,
    /// Total edge count.
    pub total_edges: u64,
}

impl Statistics {
    /// Creates a new empty statistics object.
    pub fn new() -> Self {
        Self::default()
    }

    /// Updates label statistics.
    pub fn update_label(&mut self, label: &str, stats: LabelStatistics) {
        self.labels.insert(label.to_string(), stats);
    }

    /// Updates edge type statistics.
    pub fn update_edge_type(&mut self, edge_type: &str, stats: EdgeTypeStatistics) {
        self.edge_types.insert(edge_type.to_string(), stats);
    }

    /// Updates property statistics.
    pub fn update_property(&mut self, property: &str, stats: ColumnStatistics) {
        self.properties.insert(property.to_string(), stats);
    }

    /// Gets label statistics.
    pub fn get_label(&self, label: &str) -> Option<&LabelStatistics> {
        self.labels.get(label)
    }

    /// Gets edge type statistics.
    pub fn get_edge_type(&self, edge_type: &str) -> Option<&EdgeTypeStatistics> {
        self.edge_types.get(edge_type)
    }

    /// Gets property statistics.
    pub fn get_property(&self, property: &str) -> Option<&ColumnStatistics> {
        self.properties.get(property)
    }

    /// Estimates the cardinality of a label scan.
    pub fn estimate_label_cardinality(&self, label: &str) -> f64 {
        self.labels
            .get(label)
            .map_or(1000.0, |s| s.node_count as f64) // Default estimate if no statistics
    }

    /// Estimates the average degree for an edge type.
    pub fn estimate_avg_degree(&self, edge_type: &str, outgoing: bool) -> f64 {
        self.edge_types.get(edge_type).map_or(10.0, |s| {
            if outgoing {
                s.avg_out_degree
            } else {
                s.avg_in_degree
            }
        }) // Default estimate
    }

    /// Estimates selectivity of an equality predicate.
    pub fn estimate_equality_selectivity(&self, property: &str, _value: &Value) -> f64 {
        self.properties.get(property).map_or(0.5, |s| {
            if s.distinct_count > 0 {
                1.0 / s.distinct_count as f64
            } else {
                0.5
            }
        })
    }

    /// Estimates selectivity of a range predicate.
    pub fn estimate_range_selectivity(
        &self,
        property: &str,
        lower: Option<&Value>,
        upper: Option<&Value>,
    ) -> f64 {
        self.properties
            .get(property)
            .and_then(|s| s.histogram.as_ref())
            .map_or(0.33, |h| {
                h.estimate_range_selectivity(lower, upper, true, true)
            }) // Default for range predicates
    }
}

/// Statistics for nodes with a particular label (like "Person" or "Company").
#[derive(Debug, Clone)]
pub struct LabelStatistics {
    /// Number of nodes with this label.
    pub node_count: u64,
    /// Average outgoing degree.
    pub avg_out_degree: f64,
    /// Average incoming degree.
    pub avg_in_degree: f64,
    /// Per-property statistics for nodes with this label.
    pub properties: HashMap<PropertyKey, ColumnStatistics>,
}

impl LabelStatistics {
    /// Creates new label statistics.
    pub fn new(node_count: u64) -> Self {
        Self {
            node_count,
            avg_out_degree: 0.0,
            avg_in_degree: 0.0,
            properties: HashMap::new(),
        }
    }

    /// Sets the average degrees.
    pub fn with_degrees(mut self, out_degree: f64, in_degree: f64) -> Self {
        self.avg_out_degree = out_degree;
        self.avg_in_degree = in_degree;
        self
    }

    /// Adds property statistics.
    pub fn with_property(mut self, property: &str, stats: ColumnStatistics) -> Self {
        self.properties.insert(property.to_string(), stats);
        self
    }
}

/// Alias for table statistics (used in relational contexts).
pub type TableStatistics = LabelStatistics;

/// Statistics for edges of a particular type (like "KNOWS" or "WORKS_AT").
#[derive(Debug, Clone)]
pub struct EdgeTypeStatistics {
    /// Number of edges of this type.
    pub edge_count: u64,
    /// Average outgoing degree (edges per source node).
    pub avg_out_degree: f64,
    /// Average incoming degree (edges per target node).
    pub avg_in_degree: f64,
    /// Per-property statistics for edges of this type.
    pub properties: HashMap<PropertyKey, ColumnStatistics>,
}

impl EdgeTypeStatistics {
    /// Creates new edge type statistics.
    pub fn new(edge_count: u64, avg_out_degree: f64, avg_in_degree: f64) -> Self {
        Self {
            edge_count,
            avg_out_degree,
            avg_in_degree,
            properties: HashMap::new(),
        }
    }

    /// Adds property statistics.
    pub fn with_property(mut self, property: &str, stats: ColumnStatistics) -> Self {
        self.properties.insert(property.to_string(), stats);
        self
    }
}

/// Detailed statistics about a property's values - min, max, histogram, null ratio.
#[derive(Debug, Clone)]
pub struct ColumnStatistics {
    /// Number of distinct values.
    pub distinct_count: u64,
    /// Total number of values (including nulls).
    pub total_count: u64,
    /// Number of null values.
    pub null_count: u64,
    /// Minimum value (if applicable).
    pub min_value: Option<Value>,
    /// Maximum value (if applicable).
    pub max_value: Option<Value>,
    /// Average value (for numeric types).
    pub avg_value: Option<f64>,
    /// Equi-depth histogram (for selectivity estimation).
    pub histogram: Option<Histogram>,
    /// Most common values with their frequencies.
    pub most_common: Vec<(Value, f64)>,
}

impl ColumnStatistics {
    /// Creates new column statistics with basic info.
    pub fn new(distinct_count: u64, total_count: u64, null_count: u64) -> Self {
        Self {
            distinct_count,
            total_count,
            null_count,
            min_value: None,
            max_value: None,
            avg_value: None,
            histogram: None,
            most_common: Vec::new(),
        }
    }

    /// Sets min/max values.
    pub fn with_min_max(mut self, min: Value, max: Value) -> Self {
        self.min_value = Some(min);
        self.max_value = Some(max);
        self
    }

    /// Sets the average value.
    pub fn with_avg(mut self, avg: f64) -> Self {
        self.avg_value = Some(avg);
        self
    }

    /// Sets the histogram.
    pub fn with_histogram(mut self, histogram: Histogram) -> Self {
        self.histogram = Some(histogram);
        self
    }

    /// Sets the most common values.
    pub fn with_most_common(mut self, values: Vec<(Value, f64)>) -> Self {
        self.most_common = values;
        self
    }

    /// Returns the null fraction.
    pub fn null_fraction(&self) -> f64 {
        if self.total_count == 0 {
            0.0
        } else {
            self.null_count as f64 / self.total_count as f64
        }
    }

    /// Estimates selectivity for an equality predicate.
    pub fn estimate_equality_selectivity(&self, value: &Value) -> f64 {
        // Check most common values first
        for (mcv, freq) in &self.most_common {
            if mcv == value {
                return *freq;
            }
        }

        // Use histogram if available
        if let Some(ref hist) = self.histogram {
            return hist.estimate_equality_selectivity(value);
        }

        // Fall back to uniform distribution assumption
        if self.distinct_count > 0 {
            1.0 / self.distinct_count as f64
        } else {
            0.0
        }
    }

    /// Estimates selectivity for a range predicate.
    pub fn estimate_range_selectivity(&self, lower: Option<&Value>, upper: Option<&Value>) -> f64 {
        if let Some(ref hist) = self.histogram {
            return hist.estimate_range_selectivity(lower, upper, true, true);
        }

        // Without histogram, use min/max if available
        match (&self.min_value, &self.max_value, lower, upper) {
            (Some(min), Some(max), Some(l), Some(u)) => {
                // Linear interpolation
                estimate_linear_range(min, max, l, u)
            }
            (Some(_), Some(_), Some(_), None) => 0.5, // Greater than
            (Some(_), Some(_), None, Some(_)) => 0.5, // Less than
            _ => 0.33,                                // Default
        }
    }
}

/// Estimates range selectivity using linear interpolation.
fn estimate_linear_range(min: &Value, max: &Value, lower: &Value, upper: &Value) -> f64 {
    match (min, max, lower, upper) {
        (
            Value::Int64(min_v),
            Value::Int64(max_v),
            Value::Int64(lower_v),
            Value::Int64(upper_v),
        ) => {
            let total_range = (max_v - min_v) as f64;
            if total_range <= 0.0 {
                return 1.0;
            }

            let effective_lower = (*lower_v).max(*min_v);
            let effective_upper = (*upper_v).min(*max_v);

            if effective_upper < effective_lower {
                return 0.0;
            }

            (effective_upper - effective_lower) as f64 / total_range
        }
        (
            Value::Float64(min_v),
            Value::Float64(max_v),
            Value::Float64(lower_v),
            Value::Float64(upper_v),
        ) => {
            let total_range = max_v - min_v;
            if total_range <= 0.0 {
                return 1.0;
            }

            let effective_lower = lower_v.max(*min_v);
            let effective_upper = upper_v.min(*max_v);

            if effective_upper < effective_lower {
                return 0.0;
            }

            (effective_upper - effective_lower) / total_range
        }
        _ => 0.33,
    }
}

/// Streams values through to build statistics automatically.
///
/// Call [`add()`](Self::add) for each value, then [`build()`](Self::build)
/// to get the final [`ColumnStatistics`] with histogram and most common values.
#[cfg(test)]
pub(crate) struct StatisticsCollector {
    /// Values collected for histogram building.
    values: Vec<Value>,
    /// Distinct value tracker.
    distinct: std::collections::HashSet<String>,
    /// Running min.
    min: Option<Value>,
    /// Running max.
    max: Option<Value>,
    /// Running sum (for numeric).
    sum: f64,
    /// Null count.
    null_count: u64,
    /// Value frequency counter.
    frequencies: HashMap<String, u64>,
}

#[cfg(test)]
impl StatisticsCollector {
    /// Creates a new statistics collector.
    pub fn new() -> Self {
        Self {
            values: Vec::new(),
            distinct: std::collections::HashSet::new(),
            min: None,
            max: None,
            sum: 0.0,
            null_count: 0,
            frequencies: HashMap::new(),
        }
    }

    /// Adds a value to the collector.
    pub fn add(&mut self, value: Value) {
        if matches!(value, Value::Null) {
            self.null_count += 1;
            return;
        }

        // Track distinct values
        let key = format!("{value:?}");
        self.distinct.insert(key.clone());

        // Track frequencies
        *self.frequencies.entry(key).or_insert(0) += 1;

        // Track min/max
        self.update_min_max(&value);

        // Track sum for numeric
        if let Some(v) = value_to_f64(&value) {
            self.sum += v;
        }

        self.values.push(value);
    }

    fn update_min_max(&mut self, value: &Value) {
        // Update min
        match &self.min {
            None => self.min = Some(value.clone()),
            Some(current) => {
                if compare_values(value, current) == Some(std::cmp::Ordering::Less) {
                    self.min = Some(value.clone());
                }
            }
        }

        // Update max
        match &self.max {
            None => self.max = Some(value.clone()),
            Some(current) => {
                if compare_values(value, current) == Some(std::cmp::Ordering::Greater) {
                    self.max = Some(value.clone());
                }
            }
        }
    }

    /// Builds column statistics from collected data.
    pub fn build(mut self, num_histogram_buckets: usize, num_mcv: usize) -> ColumnStatistics {
        let total_count = self.values.len() as u64 + self.null_count;
        let distinct_count = self.distinct.len() as u64;

        let avg = if !self.values.is_empty() {
            Some(self.sum / self.values.len() as f64)
        } else {
            None
        };

        // Build histogram
        self.values
            .sort_by(|a, b| compare_values(a, b).unwrap_or(std::cmp::Ordering::Equal));
        let histogram = if self.values.len() >= num_histogram_buckets {
            Some(Histogram::build(&self.values, num_histogram_buckets))
        } else {
            None
        };

        // Find most common values
        let total_non_null = self.values.len() as f64;
        let mut freq_vec: Vec<_> = self.frequencies.into_iter().collect();
        freq_vec.sort_by_key(|b| std::cmp::Reverse(b.1));

        let most_common: Vec<(Value, f64)> = freq_vec
            .into_iter()
            .take(num_mcv)
            .filter_map(|(key, count)| {
                // Try to parse the key back to a value (simplified)
                let freq = count as f64 / total_non_null;
                // This is a simplification - we'd need to store actual values
                if key.starts_with("Int64(") {
                    let num_str = key.trim_start_matches("Int64(").trim_end_matches(')');
                    num_str.parse::<i64>().ok().map(|n| (Value::Int64(n), freq))
                } else if key.starts_with("String(") {
                    let s = key
                        .trim_start_matches("String(Arc(\"")
                        .trim_end_matches("\"))");
                    Some((Value::String(s.to_string().into()), freq))
                } else {
                    None
                }
            })
            .collect();

        let mut stats = ColumnStatistics::new(distinct_count, total_count, self.null_count);

        if let Some(min) = self.min
            && let Some(max) = self.max
        {
            stats = stats.with_min_max(min, max);
        }

        if let Some(avg) = avg {
            stats = stats.with_avg(avg);
        }

        if let Some(hist) = histogram {
            stats = stats.with_histogram(hist);
        }

        if !most_common.is_empty() {
            stats = stats.with_most_common(most_common);
        }

        stats
    }
}

#[cfg(test)]
impl Default for StatisticsCollector {
    fn default() -> Self {
        Self::new()
    }
}

/// Converts a value to f64.
#[cfg(test)]
fn value_to_f64(value: &Value) -> Option<f64> {
    match value {
        Value::Int64(i) => Some(*i as f64),
        Value::Float64(f) => Some(*f),
        _ => None,
    }
}

/// Compares two values.
#[cfg(test)]
fn compare_values(a: &Value, b: &Value) -> Option<std::cmp::Ordering> {
    match (a, b) {
        (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
        (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
        (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
        (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
        (Value::Int64(a), Value::Float64(b)) => (*a as f64).partial_cmp(b),
        (Value::Float64(a), Value::Int64(b)) => a.partial_cmp(&(*b as f64)),
        (Value::Timestamp(a), Value::Timestamp(b)) => Some(a.cmp(b)),
        (Value::Date(a), Value::Date(b)) => Some(a.cmp(b)),
        (Value::Time(a), Value::Time(b)) => Some(a.cmp(b)),
        _ => None,
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_statistics_collector() {
        let mut collector = StatisticsCollector::new();

        for i in 0..100 {
            collector.add(Value::Int64(i % 10)); // Values 0-9, each appearing 10 times
        }

        let stats = collector.build(10, 5);

        assert_eq!(stats.distinct_count, 10);
        assert_eq!(stats.total_count, 100);
        assert_eq!(stats.null_count, 0);
        assert_eq!(stats.min_value, Some(Value::Int64(0)));
        assert_eq!(stats.max_value, Some(Value::Int64(9)));
    }

    #[test]
    fn test_statistics_with_nulls() {
        let mut collector = StatisticsCollector::new();

        collector.add(Value::Int64(1));
        collector.add(Value::Null);
        collector.add(Value::Int64(2));
        collector.add(Value::Null);
        collector.add(Value::Int64(3));

        let stats = collector.build(5, 3);

        assert_eq!(stats.total_count, 5);
        assert_eq!(stats.null_count, 2);
        assert_eq!(stats.distinct_count, 3);
        assert!((stats.null_fraction() - 0.4).abs() < 0.01);
    }

    #[test]
    fn test_label_statistics() {
        let stats = LabelStatistics::new(1000)
            .with_degrees(5.0, 3.0)
            .with_property(
                "age",
                ColumnStatistics::new(50, 1000, 10)
                    .with_min_max(Value::Int64(0), Value::Int64(100)),
            );

        assert_eq!(stats.node_count, 1000);
        assert_eq!(stats.avg_out_degree, 5.0);
        assert!(stats.properties.contains_key("age"));
    }

    #[test]
    fn test_statistics_min_max_updates() {
        // Values in decreasing then increasing order to exercise both min and max updates
        let mut collector = StatisticsCollector::new();

        collector.add(Value::Int64(50));
        collector.add(Value::Int64(10)); // new min
        collector.add(Value::Int64(90)); // new max
        collector.add(Value::Int64(5)); // new min again
        collector.add(Value::Int64(95)); // new max again

        let stats = collector.build(2, 3);

        assert_eq!(stats.min_value, Some(Value::Int64(5)));
        assert_eq!(stats.max_value, Some(Value::Int64(95)));
    }

    #[test]
    fn test_statistics_most_common_values() {
        let mut collector = StatisticsCollector::new();

        // Add values with known frequencies so MCVs are populated
        for _ in 0..50 {
            collector.add(Value::Int64(42));
        }
        for _ in 0..30 {
            collector.add(Value::Int64(7));
        }
        for _ in 0..20 {
            collector.add(Value::String("hello".into()));
        }

        let stats = collector.build(5, 3);

        // Should have most_common populated with parsed Int64 and String values
        assert!(
            !stats.most_common.is_empty(),
            "MCV list should be populated"
        );

        // The most frequent value should be Int64(42) at freq 0.5
        let (top_val, top_freq) = &stats.most_common[0];
        assert_eq!(*top_val, Value::Int64(42));
        assert!((top_freq - 0.5).abs() < 0.01, "42 appears 50/100 = 0.5");

        // Check that String values were also parsed back
        let has_string = stats
            .most_common
            .iter()
            .any(|(v, _)| matches!(v, Value::String(_)));
        assert!(has_string, "String MCVs should be parsed back");
    }

    #[test]
    fn test_database_statistics() {
        let mut db_stats = Statistics::new();

        db_stats.update_label(
            "Person",
            LabelStatistics::new(10000).with_degrees(10.0, 10.0),
        );

        db_stats.update_edge_type("KNOWS", EdgeTypeStatistics::new(50000, 5.0, 5.0));

        assert_eq!(db_stats.estimate_label_cardinality("Person"), 10000.0);
        assert_eq!(db_stats.estimate_label_cardinality("Unknown"), 1000.0); // Default

        assert_eq!(db_stats.estimate_avg_degree("KNOWS", true), 5.0);
    }
}