heliosdb-nano 3.22.2

PostgreSQL-compatible embedded database with TDE + ZKE encryption, HNSW vector search, Product Quantization, git-like branching, time-travel queries, materialized views, row-level security, and 50+ enterprise features
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
//! Materialized View storage and catalog management
//!
//! This module implements metadata storage and catalog management for materialized views.
//! It provides functionality for storing view definitions, tracking staleness, and managing
//! the lifecycle of materialized views.

use crate::{Result, Error, Schema, Tuple};
use crate::sql::LogicalPlan;
use super::StorageEngine;
use chrono::{DateTime, Utc};
use serde::{Serialize, Deserialize};
use std::collections::HashMap;

/// Metadata for a materialized view
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MaterializedViewMetadata {
    /// Unique view name
    pub view_name: String,
    /// SQL query definition that generates the view (for display/debugging)
    pub query_text: String,
    /// Serialized logical plan for re-execution during REFRESH
    /// This stores the bincode-serialized LogicalPlan
    pub query_plan_bytes: Vec<u8>,
    /// List of base tables this view depends on
    pub base_tables: Vec<String>,
    /// Schema of the materialized view result
    pub schema: Schema,
    /// Timestamp when the view was created
    pub created_at: DateTime<Utc>,
    /// Timestamp of last refresh (None if never refreshed)
    pub last_refresh: Option<DateTime<Utc>>,
    /// Refresh strategy: "manual", "auto", "incremental"
    pub refresh_strategy: String,
    /// Number of rows in the materialized view
    pub row_count: Option<u64>,
    /// Additional metadata (options from SQL)
    pub metadata: HashMap<String, String>,
    /// Timestamp of last full refresh (for incremental strategy)
    pub last_full_refresh: Option<DateTime<Utc>>,
    /// Number of deltas applied since last full refresh
    pub delta_count_since_full: u64,
    /// Whether incremental refresh is enabled
    pub incremental_enabled: bool,
}

impl MaterializedViewMetadata {
    /// Create a new materialized view metadata
    pub fn new(
        view_name: String,
        query_text: String,
        query_plan_bytes: Vec<u8>,
        base_tables: Vec<String>,
        schema: Schema,
    ) -> Self {
        Self {
            view_name,
            query_text,
            query_plan_bytes,
            base_tables,
            schema,
            created_at: Utc::now(),
            last_refresh: None,
            refresh_strategy: "manual".to_string(),
            row_count: None,
            metadata: HashMap::new(),
            last_full_refresh: None,
            delta_count_since_full: 0,
            incremental_enabled: false,
        }
    }

    /// Enable incremental refresh for this view
    pub fn enable_incremental(&mut self) {
        self.incremental_enabled = true;
        self.refresh_strategy = "incremental".to_string();
    }

    /// Mark that a full refresh was performed
    pub fn mark_full_refreshed(&mut self, row_count: u64) {
        self.last_refresh = Some(Utc::now());
        self.last_full_refresh = Some(Utc::now());
        self.row_count = Some(row_count);
        self.delta_count_since_full = 0;
    }

    /// Mark that an incremental refresh was performed
    pub fn mark_incremental_refreshed(&mut self, delta_count: u64) {
        self.last_refresh = Some(Utc::now());
        self.delta_count_since_full += delta_count;
    }

    /// Check if incremental refresh is needed
    pub fn needs_full_refresh(&self) -> bool {
        // Force full refresh if:
        // 1. Never had a full refresh
        // 2. Delta count exceeds 50% of row count
        if self.last_full_refresh.is_none() {
            return true;
        }

        if let Some(row_count) = self.row_count {
            if self.delta_count_since_full as f64 > row_count as f64 * 0.5 {
                return true;
            }
        }

        false
    }

    /// Deserialize the stored query plan for re-execution
    pub fn get_query_plan(&self) -> Result<LogicalPlan> {
        bincode::deserialize(&self.query_plan_bytes)
            .map_err(|e| Error::storage(format!("Failed to deserialize query plan: {}", e)))
    }

    /// Check if the view is stale (never been refreshed)
    pub fn is_stale(&self) -> bool {
        self.last_refresh.is_none()
    }

    /// Get staleness in seconds (None if never refreshed)
    pub fn staleness_seconds(&self) -> Option<i64> {
        self.last_refresh.map(|last| {
            let now = Utc::now();
            (now - last).num_seconds()
        })
    }

    /// Update refresh timestamp and row count
    pub fn mark_refreshed(&mut self, row_count: u64) {
        self.last_refresh = Some(Utc::now());
        self.row_count = Some(row_count);
    }
}

/// Materialized view catalog manager
pub struct MaterializedViewCatalog<'a> {
    storage: &'a StorageEngine,
}

impl<'a> MaterializedViewCatalog<'a> {
    /// Create a new materialized view catalog
    pub fn new(storage: &'a StorageEngine) -> Self {
        Self { storage }
    }

    /// Create a new materialized view in the catalog
    pub fn create_view(&self, metadata: MaterializedViewMetadata) -> Result<()> {
        tracing::info!("Creating materialized view '{}' in catalog", metadata.view_name);

        // Check if view already exists
        if self.view_exists(&metadata.view_name)? {
            return Err(Error::query_execution(format!(
                "Materialized view '{}' already exists",
                metadata.view_name
            )));
        }

        // Store metadata
        let key = Self::mv_metadata_key(&metadata.view_name);
        let value = bincode::serialize(&metadata)
            .map_err(|e| Error::storage(format!("Failed to serialize MV metadata: {}", e)))?;

        self.storage.put(&key, &value)?;

        tracing::info!("Successfully created materialized view '{}'", metadata.view_name);
        Ok(())
    }

    /// Check if a materialized view exists
    pub fn view_exists(&self, view_name: &str) -> Result<bool> {
        let key = Self::mv_metadata_key(view_name);
        Ok(self.storage.get(&key)?.is_some())
    }

    /// Get materialized view metadata
    pub fn get_view(&self, view_name: &str) -> Result<MaterializedViewMetadata> {
        tracing::debug!("Retrieving metadata for materialized view '{}'", view_name);

        let key = Self::mv_metadata_key(view_name);
        match self.storage.get(&key)? {
            Some(data) => {
                bincode::deserialize(&data)
                    .map_err(|e| Error::storage(format!("Failed to deserialize MV metadata: {}", e)))
            }
            None => Err(Error::query_execution(format!(
                "Materialized view '{}' does not exist",
                view_name
            ))),
        }
    }

    /// Update materialized view metadata (for refresh tracking)
    pub fn update_view(&self, metadata: &MaterializedViewMetadata) -> Result<()> {
        tracing::debug!("Updating metadata for materialized view '{}'", metadata.view_name);

        let key = Self::mv_metadata_key(&metadata.view_name);
        let value = bincode::serialize(metadata)
            .map_err(|e| Error::storage(format!("Failed to serialize MV metadata: {}", e)))?;

        self.storage.put(&key, &value)
    }

    /// Drop a materialized view from the catalog
    pub fn drop_view(&self, view_name: &str) -> Result<()> {
        tracing::info!("Dropping materialized view '{}'", view_name);

        if !self.view_exists(view_name)? {
            return Err(Error::query_execution(format!(
                "Materialized view '{}' does not exist",
                view_name
            )));
        }

        // Delete metadata
        let key = Self::mv_metadata_key(view_name);
        self.storage.delete(&key)?;

        // Delete the data table (MV results are stored as a regular table)
        let data_table = Self::mv_data_table_name(view_name);
        let catalog = self.storage.catalog();
        if catalog.table_exists(&data_table)? {
            catalog.drop_table(&data_table)?;
        }

        // Invalidate schema cache for the view name itself (may have been cached
        // by catalog.get_table_schema() which falls back to MV lookup)
        self.storage.invalidate_schema_cache(view_name);

        tracing::info!("Successfully dropped materialized view '{}'", view_name);
        Ok(())
    }

    /// List all materialized views
    pub fn list_views(&self) -> Result<Vec<String>> {
        tracing::debug!("Listing all materialized views");

        let prefix = b"meta:mv:";
        let mut views = Vec::new();

        let iter = self.storage.db.iterator(rocksdb::IteratorMode::Start);
        for item in iter {
            let (key, _) = item.map_err(|e| Error::storage(format!("Iterator error: {}", e)))?;

            if !key.starts_with(prefix) {
                if let (Some(&k), Some(&p)) = (key.first(), prefix.first()) {
                    if k > p {
                        break;
                    }
                }
                continue;
            }

            // Extract view name from key
            let view_name = String::from_utf8_lossy(key.get(prefix.len()..).unwrap_or_default()).to_string();
            views.push(view_name);
        }

        views.sort();
        tracing::debug!("Found {} materialized views", views.len());
        Ok(views)
    }

    /// Store materialized view data
    ///
    /// Stores the query results in a regular table format for easy querying.
    /// The table name is prefixed with "__mv_" to distinguish it from user tables.
    pub fn store_view_data(&self, view_name: &str, tuples: Vec<Tuple>, schema: &Schema) -> Result<u64> {
        tracing::info!("Storing data for materialized view '{}' ({} rows)", view_name, tuples.len());

        let data_table = Self::mv_data_table_name(view_name);
        let catalog = self.storage.catalog();

        // Create or recreate the data table
        if catalog.table_exists(&data_table)? {
            catalog.drop_table(&data_table)?;
        }
        catalog.create_table(&data_table, schema.clone())?;

        // Insert all tuples
        let row_count = tuples.len() as u64;
        for tuple in tuples {
            self.storage.insert_tuple(&data_table, tuple)?;
        }

        tracing::info!("Successfully stored {} rows for materialized view '{}'", row_count, view_name);
        Ok(row_count)
    }

    /// Store materialized view data concurrently (zero downtime refresh)
    ///
    /// This method implements true CONCURRENT refresh using a temporary table
    /// and atomic swap pattern:
    /// 1. Create a temporary table with unique name
    /// 2. Populate the temporary table with new data
    /// 3. Atomically rename: old -> backup, temp -> current
    /// 4. Drop the backup table
    ///
    /// This ensures that queries can continue reading from the old data
    /// during the refresh operation with zero downtime.
    ///
    /// Error handling:
    /// - If any error occurs before the rename, the temporary table is cleaned up
    /// - If rename fails partway through, we attempt to restore the original state
    /// - Cleanup errors are logged but don't fail the operation
    pub fn store_view_data_concurrent(&self, view_name: &str, tuples: Vec<Tuple>, schema: &Schema) -> Result<u64> {
        use chrono::Utc;

        tracing::info!(
            "Storing data for materialized view '{}' CONCURRENTLY ({} rows)",
            view_name, tuples.len()
        );

        let data_table = Self::mv_data_table_name(view_name);
        let catalog = self.storage.catalog();

        // Generate unique temporary table name using timestamp
        let timestamp = Utc::now().timestamp_nanos_opt().unwrap_or(0);
        let temp_table = format!("{}__temp_{}", data_table, timestamp);
        let backup_table = format!("{}__old_{}", data_table, timestamp);

        tracing::debug!(
            "Using temporary table '{}' for concurrent refresh",
            temp_table
        );

        // Step 1: Create temporary table with the new data
        if let Err(e) = catalog.create_table(&temp_table, schema.clone()) {
            tracing::error!("Failed to create temporary table '{}': {}", temp_table, e);
            return Err(e);
        }

        // Step 2: Populate temporary table
        let row_count = tuples.len() as u64;
        for (idx, tuple) in tuples.into_iter().enumerate() {
            if let Err(e) = self.storage.insert_tuple(&temp_table, tuple) {
                tracing::error!(
                    "Failed to insert tuple {} into temporary table '{}': {}",
                    idx, temp_table, e
                );

                // Cleanup: drop temporary table
                if let Err(cleanup_err) = catalog.drop_table(&temp_table) {
                    tracing::warn!(
                        "Failed to cleanup temporary table '{}' after insert error: {}",
                        temp_table, cleanup_err
                    );
                }

                return Err(e);
            }
        }

        tracing::debug!(
            "Populated temporary table '{}' with {} rows",
            temp_table, row_count
        );

        // Step 3: Atomic swap using rename operations
        // This is the critical section where we swap the tables

        // Check if the main table exists
        let table_exists = match catalog.table_exists(&data_table) {
            Ok(exists) => exists,
            Err(e) => {
                tracing::error!("Failed to check if table '{}' exists: {}", data_table, e);

                // Cleanup temporary table
                if let Err(cleanup_err) = catalog.drop_table(&temp_table) {
                    tracing::warn!(
                        "Failed to cleanup temporary table '{}': {}",
                        temp_table, cleanup_err
                    );
                }

                return Err(e);
            }
        };

        if table_exists {
            // Rename: old table -> backup table
            if let Err(e) = catalog.rename_table(&data_table, &backup_table) {
                tracing::error!(
                    "Failed to rename '{}' to '{}': {}",
                    data_table, backup_table, e
                );

                // Cleanup temporary table
                if let Err(cleanup_err) = catalog.drop_table(&temp_table) {
                    tracing::warn!(
                        "Failed to cleanup temporary table '{}': {}",
                        temp_table, cleanup_err
                    );
                }

                return Err(e);
            }
            tracing::debug!("Renamed '{}' to '{}'", data_table, backup_table);
        }

        // Rename: temp table -> main table
        if let Err(e) = catalog.rename_table(&temp_table, &data_table) {
            tracing::error!(
                "CRITICAL: Failed to rename '{}' to '{}': {}",
                temp_table, data_table, e
            );

            // Attempt to restore original state if old table was renamed
            if table_exists {
                tracing::info!("Attempting to restore original table by renaming '{}' back to '{}'",
                    backup_table, data_table);

                if let Err(restore_err) = catalog.rename_table(&backup_table, &data_table) {
                    tracing::error!(
                        "CRITICAL: Failed to restore original table '{}': {}. Manual intervention may be required.",
                        data_table, restore_err
                    );
                } else {
                    tracing::info!("Successfully restored original table '{}'", data_table);
                }
            }

            // Try to cleanup temporary table if it still exists
            if catalog.table_exists(&temp_table).unwrap_or(false) {
                if let Err(cleanup_err) = catalog.drop_table(&temp_table) {
                    tracing::warn!(
                        "Failed to cleanup temporary table '{}': {}",
                        temp_table, cleanup_err
                    );
                }
            }

            return Err(e);
        }
        tracing::debug!("Renamed '{}' to '{}'", temp_table, data_table);

        // Step 4: Clean up the backup table
        if table_exists {
            if let Err(e) = catalog.drop_table(&backup_table) {
                // Log but don't fail - the refresh succeeded, cleanup is just housekeeping
                tracing::warn!(
                    "Warning: Failed to drop backup table '{}': {}. This may be cleaned up manually.",
                    backup_table, e
                );
            } else {
                tracing::debug!("Dropped backup table '{}'", backup_table);
            }
        }

        tracing::info!(
            "Successfully stored {} rows for materialized view '{}' (CONCURRENT mode)",
            row_count, view_name
        );

        Ok(row_count)
    }

    /// Read materialized view data
    pub fn read_view_data(&self, view_name: &str) -> Result<Vec<Tuple>> {
        tracing::debug!("Reading data for materialized view '{}'", view_name);

        let data_table = Self::mv_data_table_name(view_name);
        let catalog = self.storage.catalog();

        if !catalog.table_exists(&data_table)? {
            return Err(Error::query_execution(format!(
                "Materialized view '{}' has no data (never refreshed)",
                view_name
            )));
        }

        self.storage.scan_table(&data_table)
    }

    /// Get the data table name for a materialized view
    pub fn mv_data_table_name(view_name: &str) -> String {
        format!("__mv_{}", view_name)
    }

    /// Build metadata key for materialized view
    fn mv_metadata_key(view_name: &str) -> Vec<u8> {
        format!("meta:mv:{}", view_name).into_bytes()
    }
}

#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
    use super::*;
    use crate::{Config, Column, DataType, Value};

    #[test]
    fn test_create_and_get_view() {
        let config = Config::in_memory();
        let storage = StorageEngine::open_in_memory(&config)
            .expect("Failed to open storage");
        let mv_catalog = MaterializedViewCatalog::new(&storage);

        let schema = Schema::new(vec![
            Column::new("status", DataType::Text),
            Column::new("count", DataType::Int8),
        ]);

        // Create a dummy plan for testing
        let query_plan = LogicalPlan::Scan {
            alias: None,
            table_name: "users".to_string(),
            schema: std::sync::Arc::new(schema.clone()),
            projection: None,
            as_of: None,
        };
        let query_plan_bytes = bincode::serialize(&query_plan).unwrap();

        let metadata = MaterializedViewMetadata::new(
            "user_summary".to_string(),
            "SELECT status, COUNT(*) FROM users GROUP BY status".to_string(),
            query_plan_bytes,
            vec!["users".to_string()],
            schema.clone(),
        );

        mv_catalog.create_view(metadata.clone())
            .expect("Failed to create view");

        // Verify view exists
        assert!(mv_catalog.view_exists("user_summary")
            .expect("Failed to check if view exists"));

        // Verify metadata
        let retrieved = mv_catalog.get_view("user_summary")
            .expect("Failed to get view metadata");
        assert_eq!(retrieved.view_name, "user_summary");
        assert_eq!(retrieved.query_text, metadata.query_text);
        assert_eq!(retrieved.base_tables, vec!["users"]);
    }

    #[test]
    fn test_drop_view() {
        let config = Config::in_memory();
        let storage = StorageEngine::open_in_memory(&config)
            .expect("Failed to open storage");
        let mv_catalog = MaterializedViewCatalog::new(&storage);

        let schema = Schema::new(vec![
            Column::new("id", DataType::Int4),
        ]);

        // Create a dummy plan for testing
        let query_plan = LogicalPlan::Scan {
            alias: None,
            table_name: "temp".to_string(),
            schema: std::sync::Arc::new(schema.clone()),
            projection: None,
            as_of: None,
        };
        let query_plan_bytes = bincode::serialize(&query_plan).unwrap();

        let metadata = MaterializedViewMetadata::new(
            "temp_view".to_string(),
            "SELECT id FROM temp".to_string(),
            query_plan_bytes,
            vec!["temp".to_string()],
            schema,
        );

        mv_catalog.create_view(metadata)
            .expect("Failed to create view");

        assert!(mv_catalog.view_exists("temp_view")
            .expect("Failed to check if view exists"));

        mv_catalog.drop_view("temp_view")
            .expect("Failed to drop view");

        assert!(!mv_catalog.view_exists("temp_view")
            .expect("Failed to check if view exists after drop"));
    }

    #[test]
    fn test_store_and_read_view_data() {
        let config = Config::in_memory();
        let storage = StorageEngine::open_in_memory(&config)
            .expect("Failed to open storage");
        let mv_catalog = MaterializedViewCatalog::new(&storage);

        let schema = Schema::new(vec![
            Column::new("name", DataType::Text),
            Column::new("age", DataType::Int4),
        ]);

        // Create a dummy plan for testing
        let query_plan = LogicalPlan::Scan {
            alias: None,
            table_name: "users".to_string(),
            schema: std::sync::Arc::new(schema.clone()),
            projection: None,
            as_of: None,
        };
        let query_plan_bytes = bincode::serialize(&query_plan).unwrap();

        let metadata = MaterializedViewMetadata::new(
            "test_view".to_string(),
            "SELECT name, age FROM users".to_string(),
            query_plan_bytes,
            vec!["users".to_string()],
            schema.clone(),
        );

        mv_catalog.create_view(metadata)
            .expect("Failed to create view");

        // Store test data
        let tuples = vec![
            Tuple::new(vec![
                Value::String("Alice".to_string()),
                Value::Int4(30),
            ]),
            Tuple::new(vec![
                Value::String("Bob".to_string()),
                Value::Int4(25),
            ]),
        ];

        let row_count = mv_catalog.store_view_data("test_view", tuples.clone(), &schema)
            .expect("Failed to store view data");
        assert_eq!(row_count, 2);

        // Read back data
        let retrieved = mv_catalog.read_view_data("test_view")
            .expect("Failed to read view data");
        assert_eq!(retrieved.len(), 2);
    }

    #[test]
    fn test_staleness_tracking() {
        let schema = Schema::new(vec![
            Column::new("id", DataType::Int4),
        ]);

        // Create a dummy plan for testing
        let query_plan = LogicalPlan::Scan {
            alias: None,
            table_name: "test".to_string(),
            schema: std::sync::Arc::new(schema.clone()),
            projection: None,
            as_of: None,
        };
        let query_plan_bytes = bincode::serialize(&query_plan).unwrap();

        let mut metadata = MaterializedViewMetadata::new(
            "test_view".to_string(),
            "SELECT id FROM test".to_string(),
            query_plan_bytes,
            vec!["test".to_string()],
            schema,
        );

        // Initially stale
        assert!(metadata.is_stale());
        assert!(metadata.staleness_seconds().is_none());

        // Mark as refreshed
        metadata.mark_refreshed(100);
        assert!(!metadata.is_stale());
        assert!(metadata.last_refresh.is_some());
        assert_eq!(metadata.row_count, Some(100));

        // Staleness should be very small (just now)
        let staleness = metadata.staleness_seconds().expect("Should have staleness");
        assert!(staleness >= 0 && staleness < 2); // Less than 2 seconds
    }
}