ruvector-postgres 2.0.5

High-performance PostgreSQL vector database extension v2 - pgvector drop-in replacement with 230+ SQL functions, SIMD acceleration, Flash Attention, GNN layers, hybrid search, multi-tenancy, self-healing, and self-learning capabilities
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
//! Background worker for index maintenance and optimization
//!
//! Implements PostgreSQL background worker for:
//! - Periodic index optimization
//! - Index statistics collection
//! - Vacuum and cleanup operations
//! - Automatic reindexing for heavily updated indexes

use pgrx::prelude::*;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use parking_lot::RwLock;

// ============================================================================
// Background Worker Configuration
// ============================================================================

/// Configuration for RuVector background worker
#[derive(Debug, Clone)]
pub struct BgWorkerConfig {
    /// Maintenance interval in seconds
    pub maintenance_interval_secs: u64,
    /// Whether to perform automatic optimization
    pub auto_optimize: bool,
    /// Whether to collect statistics
    pub collect_stats: bool,
    /// Whether to perform automatic vacuum
    pub auto_vacuum: bool,
    /// Minimum age (in seconds) before vacuuming an index
    pub vacuum_min_age_secs: u64,
    /// Maximum number of indexes to process per cycle
    pub max_indexes_per_cycle: usize,
    /// Optimization threshold (e.g., 10% deleted tuples)
    pub optimize_threshold: f32,
}

impl Default for BgWorkerConfig {
    fn default() -> Self {
        Self {
            maintenance_interval_secs: 300, // 5 minutes
            auto_optimize: true,
            collect_stats: true,
            auto_vacuum: true,
            vacuum_min_age_secs: 3600, // 1 hour
            max_indexes_per_cycle: 10,
            optimize_threshold: 0.10, // 10%
        }
    }
}

/// Global background worker state
pub struct BgWorkerState {
    /// Configuration
    config: RwLock<BgWorkerConfig>,
    /// Whether worker is running
    running: AtomicBool,
    /// Last maintenance timestamp
    last_maintenance: AtomicU64,
    /// Total maintenance cycles completed
    cycles_completed: AtomicU64,
    /// Total indexes maintained
    indexes_maintained: AtomicU64,
}

impl BgWorkerState {
    /// Create new background worker state
    pub fn new(config: BgWorkerConfig) -> Self {
        Self {
            config: RwLock::new(config),
            running: AtomicBool::new(false),
            last_maintenance: AtomicU64::new(0),
            cycles_completed: AtomicU64::new(0),
            indexes_maintained: AtomicU64::new(0),
        }
    }

    /// Check if worker is running
    pub fn is_running(&self) -> bool {
        self.running.load(Ordering::SeqCst)
    }

    /// Start worker
    pub fn start(&self) {
        self.running.store(true, Ordering::SeqCst);
    }

    /// Stop worker
    pub fn stop(&self) {
        self.running.store(false, Ordering::SeqCst);
    }

    /// Get statistics
    pub fn get_stats(&self) -> BgWorkerStats {
        BgWorkerStats {
            running: self.running.load(Ordering::SeqCst),
            last_maintenance: self.last_maintenance.load(Ordering::SeqCst),
            cycles_completed: self.cycles_completed.load(Ordering::SeqCst),
            indexes_maintained: self.indexes_maintained.load(Ordering::SeqCst),
        }
    }

    /// Record maintenance cycle
    fn record_cycle(&self, indexes_count: u64) {
        let now = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .unwrap()
            .as_secs();

        self.last_maintenance.store(now, Ordering::SeqCst);
        self.cycles_completed.fetch_add(1, Ordering::SeqCst);
        self.indexes_maintained.fetch_add(indexes_count, Ordering::SeqCst);
    }
}

/// Background worker statistics
#[derive(Debug, Clone)]
pub struct BgWorkerStats {
    pub running: bool,
    pub last_maintenance: u64,
    pub cycles_completed: u64,
    pub indexes_maintained: u64,
}

// Global worker state
static WORKER_STATE: std::sync::OnceLock<Arc<BgWorkerState>> = std::sync::OnceLock::new();

fn get_worker_state() -> &'static Arc<BgWorkerState> {
    WORKER_STATE.get_or_init(|| {
        Arc::new(BgWorkerState::new(BgWorkerConfig::default()))
    })
}

// ============================================================================
// Background Worker Entry Point
// ============================================================================

/// Main background worker function
///
/// This is registered with PostgreSQL and runs in a separate background process.
#[pg_guard]
pub extern "C" fn ruvector_bgworker_main(_arg: pg_sys::Datum) {
    // Initialize worker
    pgrx::log!("RuVector background worker starting");

    let worker_state = get_worker_state();
    worker_state.start();

    // Main loop
    while worker_state.is_running() {
        // Perform maintenance cycle
        if let Err(e) = perform_maintenance_cycle() {
            pgrx::warning!("Background worker maintenance failed: {}", e);
        }

        // Sleep until next cycle
        let interval = {
            let config = worker_state.config.read();
            config.maintenance_interval_secs
        };

        // Use PostgreSQL's WaitLatch for interruptible sleep
        unsafe {
            pg_sys::WaitLatch(
                pg_sys::MyLatch,
                pg_sys::WL_LATCH_SET as i32 | pg_sys::WL_TIMEOUT as i32,
                (interval * 1000) as i64, // Convert to milliseconds
                pg_sys::PG_WAIT_EXTENSION as u32,
            );
            pg_sys::ResetLatch(pg_sys::MyLatch);
        }

        // Check for shutdown signal
        if unsafe { pg_sys::ShutdownRequestPending } {
            break;
        }
    }

    worker_state.stop();
    pgrx::log!("RuVector background worker stopped");
}

// ============================================================================
// Maintenance Operations
// ============================================================================

/// Perform one maintenance cycle
fn perform_maintenance_cycle() -> Result<(), String> {
    let worker_state = get_worker_state();
    let config = worker_state.config.read().clone();
    drop(worker_state.config.read());

    // Find all RuVector indexes
    let indexes = find_ruvector_indexes(config.max_indexes_per_cycle)?;

    let mut maintained_count = 0u64;

    for index_info in indexes {
        // Perform maintenance operations
        if config.collect_stats {
            if let Err(e) = collect_index_stats(&index_info) {
                pgrx::warning!("Failed to collect stats for index {}: {}", index_info.name, e);
            }
        }

        if config.auto_optimize {
            if let Err(e) = optimize_index_if_needed(&index_info, config.optimize_threshold) {
                pgrx::warning!("Failed to optimize index {}: {}", index_info.name, e);
            } else {
                maintained_count += 1;
            }
        }

        if config.auto_vacuum {
            if let Err(e) = vacuum_index_if_needed(&index_info, config.vacuum_min_age_secs) {
                pgrx::warning!("Failed to vacuum index {}: {}", index_info.name, e);
            }
        }
    }

    worker_state.record_cycle(maintained_count);

    Ok(())
}

/// Index information
#[derive(Debug, Clone)]
struct IndexInfo {
    name: String,
    oid: pg_sys::Oid,
    relation_oid: pg_sys::Oid,
    index_type: String, // "ruhnsw" or "ruivfflat"
    size_bytes: i64,
    tuple_count: i64,
    last_vacuum: Option<u64>,
}

/// Find all RuVector indexes in the database
fn find_ruvector_indexes(max_count: usize) -> Result<Vec<IndexInfo>, String> {
    let mut indexes = Vec::new();

    // Query pg_class for indexes using our access methods
    // This is a simplified version - in production, use SPI to query system catalogs

    // For now, return empty list (would be populated via SPI query in production)
    // Example query:
    // SELECT c.relname, c.oid, c.relfilenode, am.amname, pg_relation_size(c.oid)
    // FROM pg_class c
    // JOIN pg_am am ON c.relam = am.oid
    // WHERE am.amname IN ('ruhnsw', 'ruivfflat')
    // LIMIT $max_count

    Ok(indexes)
}

/// Collect statistics for an index
fn collect_index_stats(index: &IndexInfo) -> Result<(), String> {
    pgrx::debug1!("Collecting stats for index: {}", index.name);

    // In production, collect:
    // - Index size
    // - Number of tuples
    // - Number of deleted tuples
    // - Fragmentation level
    // - Average search depth
    // - Distribution statistics

    Ok(())
}

/// Optimize index if it exceeds threshold
fn optimize_index_if_needed(index: &IndexInfo, threshold: f32) -> Result<(), String> {
    // Check if optimization is needed
    let fragmentation = calculate_fragmentation(index)?;

    if fragmentation > threshold {
        pgrx::log!(
            "Optimizing index {} (fragmentation: {:.2}%)",
            index.name,
            fragmentation * 100.0
        );

        optimize_index(index)?;
    }

    Ok(())
}

/// Calculate index fragmentation ratio
fn calculate_fragmentation(_index: &IndexInfo) -> Result<f32, String> {
    // In production:
    // - Count deleted/obsolete tuples
    // - Measure graph connectivity (for HNSW)
    // - Check for unbalanced partitions

    // For now, return low fragmentation
    Ok(0.05)
}

/// Perform index optimization
fn optimize_index(index: &IndexInfo) -> Result<(), String> {
    match index.index_type.as_str() {
        "ruhnsw" => optimize_hnsw_index(index),
        "ruivfflat" => optimize_ivfflat_index(index),
        _ => Err(format!("Unknown index type: {}", index.index_type)),
    }
}

/// Optimize HNSW index
fn optimize_hnsw_index(index: &IndexInfo) -> Result<(), String> {
    pgrx::log!("Optimizing HNSW index: {}", index.name);

    // HNSW optimization operations:
    // 1. Remove deleted nodes
    // 2. Rebuild edges for improved connectivity
    // 3. Rebalance layers
    // 4. Compact memory

    Ok(())
}

/// Optimize IVFFlat index
fn optimize_ivfflat_index(index: &IndexInfo) -> Result<(), String> {
    pgrx::log!("Optimizing IVFFlat index: {}", index.name);

    // IVFFlat optimization operations:
    // 1. Recompute centroids
    // 2. Rebalance lists
    // 3. Remove deleted vectors
    // 4. Update statistics

    Ok(())
}

/// Vacuum index if needed
fn vacuum_index_if_needed(index: &IndexInfo, min_age_secs: u64) -> Result<(), String> {
    // Check if vacuum is needed based on age
    if let Some(last_vacuum) = index.last_vacuum {
        let now = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .unwrap()
            .as_secs();

        if now - last_vacuum < min_age_secs {
            return Ok(()); // Too soon
        }
    }

    pgrx::log!("Vacuuming index: {}", index.name);

    // Perform vacuum
    // In production, use PostgreSQL's vacuum infrastructure

    Ok(())
}

// ============================================================================
// SQL Functions for Background Worker Control
// ============================================================================

/// Start the background worker
#[pg_extern]
pub fn ruvector_bgworker_start() -> bool {
    let worker_state = get_worker_state();
    if worker_state.is_running() {
        pgrx::warning!("Background worker is already running");
        return false;
    }

    // In production, register and launch the background worker
    // For now, just mark as started
    worker_state.start();
    pgrx::log!("Background worker started");
    true
}

/// Stop the background worker
#[pg_extern]
pub fn ruvector_bgworker_stop() -> bool {
    let worker_state = get_worker_state();
    if !worker_state.is_running() {
        pgrx::warning!("Background worker is not running");
        return false;
    }

    worker_state.stop();
    pgrx::log!("Background worker stopped");
    true
}

/// Get background worker status and statistics
#[pg_extern]
pub fn ruvector_bgworker_status() -> pgrx::JsonB {
    let worker_state = get_worker_state();
    let stats = worker_state.get_stats();
    let config = worker_state.config.read().clone();

    let status = serde_json::json!({
        "running": stats.running,
        "last_maintenance": stats.last_maintenance,
        "cycles_completed": stats.cycles_completed,
        "indexes_maintained": stats.indexes_maintained,
        "config": {
            "maintenance_interval_secs": config.maintenance_interval_secs,
            "auto_optimize": config.auto_optimize,
            "collect_stats": config.collect_stats,
            "auto_vacuum": config.auto_vacuum,
            "vacuum_min_age_secs": config.vacuum_min_age_secs,
            "max_indexes_per_cycle": config.max_indexes_per_cycle,
            "optimize_threshold": config.optimize_threshold,
        }
    });

    pgrx::JsonB(status)
}

/// Update background worker configuration
#[pg_extern]
pub fn ruvector_bgworker_config(
    maintenance_interval_secs: Option<i32>,
    auto_optimize: Option<bool>,
    collect_stats: Option<bool>,
    auto_vacuum: Option<bool>,
) -> pgrx::JsonB {
    let worker_state = get_worker_state();
    let mut config = worker_state.config.write();

    if let Some(interval) = maintenance_interval_secs {
        if interval > 0 {
            config.maintenance_interval_secs = interval as u64;
        }
    }

    if let Some(optimize) = auto_optimize {
        config.auto_optimize = optimize;
    }

    if let Some(stats) = collect_stats {
        config.collect_stats = stats;
    }

    if let Some(vacuum) = auto_vacuum {
        config.auto_vacuum = vacuum;
    }

    let result = serde_json::json!({
        "status": "updated",
        "config": {
            "maintenance_interval_secs": config.maintenance_interval_secs,
            "auto_optimize": config.auto_optimize,
            "collect_stats": config.collect_stats,
            "auto_vacuum": config.auto_vacuum,
        }
    });

    pgrx::JsonB(result)
}

// ============================================================================
// Worker Registration
// ============================================================================

/// Register background worker with PostgreSQL
///
/// This should be called from _PG_init()
pub fn register_background_worker() {
    // In production, use pg_sys::RegisterBackgroundWorker
    // For now, just log
    pgrx::log!("RuVector background worker registration placeholder");

    // Example registration (pseudo-code):
    // unsafe {
    //     let mut worker = pg_sys::BackgroundWorker::default();
    //     worker.bgw_name = "ruvector maintenance worker";
    //     worker.bgw_type = "ruvector worker";
    //     worker.bgw_flags = BGW_NEVER_RESTART;
    //     worker.bgw_start_time = BgWorkerStartTime::BgWorkerStart_RecoveryFinished;
    //     worker.bgw_main = Some(ruvector_bgworker_main);
    //     pg_sys::RegisterBackgroundWorker(&mut worker);
    // }
}

// ============================================================================
// Tests
// ============================================================================

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_worker_state() {
        let state = BgWorkerState::new(BgWorkerConfig::default());

        assert!(!state.is_running());

        state.start();
        assert!(state.is_running());

        state.stop();
        assert!(!state.is_running());
    }

    #[test]
    fn test_stats_recording() {
        let state = BgWorkerState::new(BgWorkerConfig::default());

        state.record_cycle(5);
        state.record_cycle(3);

        let stats = state.get_stats();
        assert_eq!(stats.cycles_completed, 2);
        assert_eq!(stats.indexes_maintained, 8);
        assert!(stats.last_maintenance > 0);
    }

    #[test]
    fn test_default_config() {
        let config = BgWorkerConfig::default();

        assert_eq!(config.maintenance_interval_secs, 300);
        assert!(config.auto_optimize);
        assert!(config.collect_stats);
        assert!(config.auto_vacuum);
        assert_eq!(config.optimize_threshold, 0.10);
    }
}