Skip to main content

agent_diva_files/
index.rs

1//! SQLite-based file index
2//!
3//! This module provides a persistent file index using SQLite.
4//! It replaces the in-memory HashMap with a proper database for:
5//! - Better query performance with large datasets
6//! - ACID transactions
7//! - SQL-based filtering and aggregation
8//! - Automatic persistence
9
10use crate::handle::{FileIndexEntry, FileMetadata};
11use crate::Result;
12use chrono::{DateTime, Utc};
13use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
14use sqlx::{Row, SqlitePool};
15use std::path::{Path, PathBuf};
16
17/// SQLite-based file index
18pub struct SqliteIndex {
19    pool: SqlitePool,
20    db_path: PathBuf,
21}
22
23impl SqliteIndex {
24    /// Create a new SQLite index at the given path
25    pub async fn new(db_path: PathBuf) -> Result<Self> {
26        // Ensure parent directory exists
27        if let Some(parent) = db_path.parent() {
28            tokio::fs::create_dir_all(parent).await?;
29        }
30
31        // Use SqliteConnectOptions for proper path handling on Windows
32        let options = SqliteConnectOptions::new()
33            .filename(&db_path)
34            .create_if_missing(true);
35
36        let pool = SqlitePoolOptions::new()
37            .max_connections(5)
38            .connect_with(options)
39            .await?;
40
41        let index = Self { pool, db_path };
42        index.init().await?;
43
44        Ok(index)
45    }
46
47    /// Initialize the database schema
48    async fn init(&self) -> Result<()> {
49        // Create main files table with soft delete support
50        sqlx::query(
51            r#"
52            CREATE TABLE IF NOT EXISTS files (
53                id TEXT PRIMARY KEY NOT NULL,
54                path TEXT NOT NULL,
55                size INTEGER NOT NULL DEFAULT 0,
56                ref_count INTEGER NOT NULL DEFAULT 1,
57                created_at TEXT NOT NULL,
58                last_accessed_at TEXT,
59                deleted_at TEXT,          -- NEW: soft delete timestamp
60                deleted_by TEXT,          -- NEW: who deleted the file
61                metadata_json TEXT NOT NULL
62            );
63
64            CREATE INDEX IF NOT EXISTS idx_files_ref_count ON files(ref_count);
65            CREATE INDEX IF NOT EXISTS idx_files_last_accessed ON files(last_accessed_at);
66            CREATE INDEX IF NOT EXISTS idx_files_deleted_at ON files(deleted_at) WHERE deleted_at IS NOT NULL;
67            "#,
68        )
69        .execute(&self.pool)
70        .await?;
71
72        // Migration: Add deleted_at column if not exists (for existing databases)
73        let columns: Vec<String> =
74            sqlx::query_scalar("SELECT name FROM pragma_table_info('files')")
75                .fetch_all(&self.pool)
76                .await?;
77
78        if !columns.contains(&"deleted_at".to_string()) {
79            let _ = sqlx::query("ALTER TABLE files ADD COLUMN deleted_at TEXT")
80                .execute(&self.pool)
81                .await;
82        }
83        if !columns.contains(&"deleted_by".to_string()) {
84            let _ = sqlx::query("ALTER TABLE files ADD COLUMN deleted_by TEXT")
85                .execute(&self.pool)
86                .await;
87        }
88
89        tracing::info!("SQLite index initialized at {:?}", self.db_path);
90        Ok(())
91    }
92
93    /// Load index from existing database (alias for new)
94    pub async fn load(db_path: PathBuf) -> Result<Self> {
95        Self::new(db_path).await
96    }
97
98    /// Get an entry by ID
99    ///
100    /// # Arguments
101    /// * `id` - The file ID
102    /// * `include_deleted` - If true, include soft-deleted files
103    pub async fn get(&self, id: &str, include_deleted: bool) -> Result<Option<FileIndexEntry>> {
104        let row = if include_deleted {
105            sqlx::query(
106                r#"
107                SELECT id, path, size, ref_count, created_at, last_accessed_at, metadata_json
108                FROM files WHERE id = ?
109                "#,
110            )
111            .bind(id)
112            .fetch_optional(&self.pool)
113            .await?
114        } else {
115            sqlx::query(
116                r#"
117                SELECT id, path, size, ref_count, created_at, last_accessed_at, metadata_json
118                FROM files WHERE id = ? AND deleted_at IS NULL
119                "#,
120            )
121            .bind(id)
122            .fetch_optional(&self.pool)
123            .await?
124        };
125
126        match row {
127            Some(row) => Ok(Some(self.row_to_entry(&row)?)),
128            None => Ok(None),
129        }
130    }
131
132    /// Soft delete a file by ID
133    ///
134    /// Marks the file as deleted but does not remove it physically.
135    /// The file can be restored until the retention period expires.
136    pub async fn soft_delete(&self, id: &str, deleted_by: Option<&str>) -> Result<bool> {
137        let result = sqlx::query(
138            r#"
139            UPDATE files
140            SET deleted_at = ?, deleted_by = ?
141            WHERE id = ? AND deleted_at IS NULL
142            "#,
143        )
144        .bind(Utc::now().to_rfc3339())
145        .bind(deleted_by)
146        .bind(id)
147        .execute(&self.pool)
148        .await?;
149
150        Ok(result.rows_affected() > 0)
151    }
152
153    /// Restore a soft-deleted file
154    ///
155    /// Clears the deleted_at timestamp, making the file accessible again.
156    pub async fn restore(&self, id: &str) -> Result<bool> {
157        let result = sqlx::query(
158            r#"
159            UPDATE files
160            SET deleted_at = NULL, deleted_by = NULL
161            WHERE id = ? AND deleted_at IS NOT NULL
162            "#,
163        )
164        .bind(id)
165        .execute(&self.pool)
166        .await?;
167
168        Ok(result.rows_affected() > 0)
169    }
170
171    /// List all soft-deleted files
172    pub async fn list_deleted(&self) -> Result<Vec<FileIndexEntry>> {
173        let rows = sqlx::query(
174            r#"
175            SELECT id, path, size, ref_count, created_at, last_accessed_at, metadata_json
176            FROM files
177            WHERE deleted_at IS NOT NULL
178            ORDER BY deleted_at DESC
179            "#,
180        )
181        .fetch_all(&self.pool)
182        .await?;
183
184        let mut entries = Vec::new();
185        for row in rows {
186            entries.push(self.row_to_entry(&row)?);
187        }
188
189        Ok(entries)
190    }
191
192    /// Get soft-deleted files that are ready for permanent deletion
193    ///
194    /// # Arguments
195    /// * `retention_days` - Files deleted more than this many days ago
196    pub async fn get_expired_deletions(&self, retention_days: u32) -> Result<Vec<FileIndexEntry>> {
197        let cutoff = Utc::now() - chrono::Duration::days(retention_days as i64);
198        let cutoff_str = cutoff.to_rfc3339();
199
200        let rows = sqlx::query(
201            r#"
202            SELECT id, path, size, ref_count, created_at, last_accessed_at, metadata_json
203            FROM files
204            WHERE deleted_at IS NOT NULL AND deleted_at < ?
205            "#,
206        )
207        .bind(cutoff_str)
208        .fetch_all(&self.pool)
209        .await?;
210
211        let mut entries = Vec::new();
212        for row in rows {
213            entries.push(self.row_to_entry(&row)?);
214        }
215
216        Ok(entries)
217    }
218
219    /// Hard delete a file (physical removal from database)
220    ///
221    /// Use with caution - this permanently removes the index entry.
222    /// The actual file data should be deleted separately.
223    pub async fn hard_delete(&self, id: &str) -> Result<Option<FileIndexEntry>> {
224        // First get the entry to return it
225        let entry = self.get(id, true).await?;
226
227        if entry.is_some() {
228            sqlx::query("DELETE FROM files WHERE id = ?")
229                .bind(id)
230                .execute(&self.pool)
231                .await?;
232        }
233
234        Ok(entry)
235    }
236
237    /// Insert or update an entry
238    pub async fn insert(&self, entry: FileIndexEntry) -> Result<()> {
239        let metadata_json = serde_json::to_string(&entry.metadata)?;
240
241        sqlx::query(
242            r#"
243            INSERT INTO files (id, path, size, ref_count, created_at, last_accessed_at, metadata_json)
244            VALUES (?, ?, ?, ?, ?, ?, ?)
245            ON CONFLICT(id) DO UPDATE SET
246                path = excluded.path,
247                size = excluded.size,
248                ref_count = excluded.ref_count,
249                last_accessed_at = excluded.last_accessed_at,
250                metadata_json = excluded.metadata_json,
251                deleted_at = NULL,
252                deleted_by = NULL
253            "#,
254        )
255        .bind(&entry.id)
256        .bind(entry.path.to_string_lossy().to_string())
257        .bind(entry.size as i64)
258        .bind(entry.ref_count as i64)
259        .bind(entry.created_at.to_rfc3339())
260        .bind(entry.last_accessed_at.map(|t| t.to_rfc3339()))
261        .bind(metadata_json)
262        .execute(&self.pool)
263        .await?;
264
265        Ok(())
266    }
267
268    /// Remove an entry by ID
269    ///
270    /// This physically removes the entry from the database.
271    /// Note: For soft delete, use `soft_delete()` instead.
272    pub async fn remove(&self, id: &str) -> Result<Option<FileIndexEntry>> {
273        // First get the entry to return it (include deleted since we're removing it)
274        let entry = self.get(id, true).await?;
275
276        if entry.is_some() {
277            sqlx::query("DELETE FROM files WHERE id = ?")
278                .bind(id)
279                .execute(&self.pool)
280                .await?;
281        }
282
283        Ok(entry)
284    }
285
286    /// Update reference count for an entry
287    ///
288    /// # Arguments
289    /// * `id` - The file ID
290    /// * `delta` - Change in reference count (+1 to add, -1 to release)
291    ///
292    /// # Returns
293    /// The new reference count, or None if the entry doesn't exist
294    pub async fn update_ref_count(&self, id: &str, delta: i32) -> Result<Option<usize>> {
295        // Get the current entry (only non-deleted files should have their ref count updated)
296        let current = self.get(id, false).await?;
297
298        if let Some(entry) = current {
299            let new_count = if delta < 0 {
300                entry
301                    .ref_count
302                    .saturating_sub(delta.unsigned_abs() as usize)
303            } else {
304                entry.ref_count.saturating_add(delta as usize)
305            };
306
307            sqlx::query(
308                r#"
309                UPDATE files
310                SET ref_count = ?, last_accessed_at = ?
311                WHERE id = ?
312                "#,
313            )
314            .bind(new_count as i64)
315            .bind(Utc::now().to_rfc3339())
316            .bind(id)
317            .execute(&self.pool)
318            .await?;
319
320            Ok(Some(new_count))
321        } else {
322            Ok(None)
323        }
324    }
325
326    /// Get entries suitable for cleanup
327    pub async fn get_candidates_for_cleanup(
328        &self,
329        threshold: i32,
330        max_age_days: u32,
331    ) -> Result<Vec<FileIndexEntry>> {
332        let cutoff = Utc::now() - chrono::Duration::days(max_age_days as i64);
333        let cutoff_str = cutoff.to_rfc3339();
334
335        let rows = sqlx::query(
336            r#"
337            SELECT id, path, size, ref_count, created_at, last_accessed_at, metadata_json
338            FROM files
339            WHERE ref_count <= ?
340              AND (last_accessed_at IS NULL OR last_accessed_at < ?)
341            "#,
342        )
343        .bind(threshold as i64)
344        .bind(cutoff_str)
345        .fetch_all(&self.pool)
346        .await?;
347
348        let mut entries = Vec::new();
349        for row in rows {
350            entries.push(self.row_to_entry(&row)?);
351        }
352
353        Ok(entries)
354    }
355
356    /// Get total entry count
357    pub async fn len(&self) -> Result<usize> {
358        let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM files")
359            .fetch_one(&self.pool)
360            .await?;
361        Ok(count as usize)
362    }
363
364    /// Check if index is empty
365    pub async fn is_empty(&self) -> Result<bool> {
366        Ok(self.len().await? == 0)
367    }
368
369    /// Get all entries (use sparingly with large datasets)
370    pub async fn entries(&self) -> Result<Vec<FileIndexEntry>> {
371        let rows = sqlx::query(
372            r#"
373            SELECT id, path, size, ref_count, created_at, last_accessed_at, metadata_json
374            FROM files
375            "#,
376        )
377        .fetch_all(&self.pool)
378        .await?;
379
380        let mut entries = Vec::new();
381        for row in rows {
382            entries.push(self.row_to_entry(&row)?);
383        }
384
385        Ok(entries)
386    }
387
388    /// Get statistics about the index
389    pub async fn stats(&self) -> Result<IndexStats> {
390        let row = sqlx::query(
391            r#"
392            SELECT
393                COUNT(*) as total_files,
394                COALESCE(SUM(size), 0) as total_size,
395                COALESCE(SUM(ref_count), 0) as total_refs
396            FROM files
397            "#,
398        )
399        .fetch_one(&self.pool)
400        .await?;
401
402        Ok(IndexStats {
403            total_files: row.get::<i64, _>("total_files") as usize,
404            total_size: row.get::<i64, _>("total_size") as u64,
405            total_refs: row.get::<i64, _>("total_refs") as usize,
406        })
407    }
408
409    /// Close the database connection
410    pub async fn close(&self) {
411        self.pool.close().await;
412    }
413
414    /// Convert a database row to FileIndexEntry
415    fn row_to_entry(&self, row: &sqlx::sqlite::SqliteRow) -> Result<FileIndexEntry> {
416        let metadata_json: String = row.get("metadata_json");
417        let metadata: FileMetadata = serde_json::from_str(&metadata_json)?;
418
419        Ok(FileIndexEntry {
420            id: row.get("id"),
421            path: PathBuf::from(row.get::<String, _>("path")),
422            size: row.get::<i64, _>("size") as u64,
423            ref_count: row.get::<i64, _>("ref_count") as usize,
424            created_at: DateTime::parse_from_rfc3339(&row.get::<String, _>("created_at"))?
425                .with_timezone(&Utc),
426            last_accessed_at: row
427                .get::<Option<String>, _>("last_accessed_at")
428                .map(|s| DateTime::parse_from_rfc3339(&s).map(|dt| dt.with_timezone(&Utc)))
429                .transpose()?,
430            metadata,
431        })
432    }
433
434    /// Migrate from old JSONL format
435    pub async fn migrate_from_jsonl(&self, jsonl_path: &Path) -> Result<usize> {
436        if !jsonl_path.exists() {
437            return Ok(0);
438        }
439
440        let content = tokio::fs::read_to_string(jsonl_path).await?;
441        let mut count = 0;
442
443        for line in content.lines() {
444            if line.trim().is_empty() {
445                continue;
446            }
447            if let Ok(entry) = serde_json::from_str::<FileIndexEntry>(line) {
448                self.insert(entry).await?;
449                count += 1;
450            } else {
451                tracing::warn!("Failed to parse index entry during migration: {}", line);
452            }
453        }
454
455        tracing::info!("Migrated {} entries from JSONL to SQLite", count);
456        Ok(count)
457    }
458}
459
460/// Statistics for the index
461#[derive(Debug, Clone, Default)]
462pub struct IndexStats {
463    pub total_files: usize,
464    pub total_size: u64,
465    pub total_refs: usize,
466}
467
468/// Backward-compatible FileIndex wrapper
469///
470/// This type alias allows existing code to continue working while
471/// migrating from the old in-memory HashMap implementation.
472pub type FileIndex = SqliteIndex;
473
474#[cfg(test)]
475mod tests {
476    use super::*;
477    use tempfile::TempDir;
478
479    fn create_test_metadata() -> FileMetadata {
480        FileMetadata {
481            name: "test.txt".to_string(),
482            size: 100,
483            mime_type: Some("text/plain".to_string()),
484            source: Some("test".to_string()),
485            created_at: Utc::now(),
486            last_accessed_at: None,
487            preview: None,
488        }
489    }
490
491    #[tokio::test]
492    async fn test_sqlite_index_basic() {
493        let temp_dir = TempDir::new().unwrap();
494        let db_path = temp_dir.path().join("index.db");
495
496        let index = SqliteIndex::new(db_path).await.unwrap();
497
498        // Insert
499        let entry = FileIndexEntry {
500            id: "abc123".to_string(),
501            path: PathBuf::from("ab/c123"),
502            size: 100,
503            ref_count: 1,
504            created_at: Utc::now(),
505            last_accessed_at: None,
506            metadata: create_test_metadata(),
507        };
508
509        index.insert(entry).await.unwrap();
510        assert_eq!(index.len().await.unwrap(), 1);
511
512        // Get
513        let retrieved = index.get("abc123", false).await.unwrap();
514        assert!(retrieved.is_some());
515        let retrieved = retrieved.unwrap();
516        assert_eq!(retrieved.id, "abc123");
517        assert_eq!(retrieved.ref_count, 1);
518
519        // Update ref count
520        let new_count = index.update_ref_count("abc123", 1).await.unwrap();
521        assert_eq!(new_count, Some(2));
522
523        let updated = index.get("abc123", false).await.unwrap().unwrap();
524        assert_eq!(updated.ref_count, 2);
525
526        // Remove
527        let removed = index.remove("abc123").await.unwrap();
528        assert!(removed.is_some());
529        assert_eq!(index.len().await.unwrap(), 0);
530    }
531
532    #[tokio::test]
533    async fn test_sqlite_index_stats() {
534        let temp_dir = TempDir::new().unwrap();
535        let db_path = temp_dir.path().join("index.db");
536
537        let index = SqliteIndex::new(db_path).await.unwrap();
538
539        // Insert multiple entries
540        for i in 0..5 {
541            let entry = FileIndexEntry {
542                id: format!("file{}", i),
543                path: PathBuf::from(format!("f{}/{}", i, i)),
544                size: 100 * (i + 1) as u64,
545                ref_count: i + 1,
546                created_at: Utc::now(),
547                last_accessed_at: None,
548                metadata: create_test_metadata(),
549            };
550            index.insert(entry).await.unwrap();
551        }
552
553        let stats = index.stats().await.unwrap();
554        assert_eq!(stats.total_files, 5);
555        assert_eq!(stats.total_size, 100 + 200 + 300 + 400 + 500);
556        assert_eq!(stats.total_refs, 1 + 2 + 3 + 4 + 5);
557    }
558
559    #[tokio::test]
560    async fn test_sqlite_index_cleanup_candidates() {
561        let temp_dir = TempDir::new().unwrap();
562        let db_path = temp_dir.path().join("index.db");
563
564        let index = SqliteIndex::new(db_path).await.unwrap();
565
566        // Insert entry with old access time and low ref count
567        let old_entry = FileIndexEntry {
568            id: "old_file".to_string(),
569            path: PathBuf::from("old/path"),
570            size: 100,
571            ref_count: 0,
572            created_at: Utc::now() - chrono::Duration::days(100),
573            last_accessed_at: Some(Utc::now() - chrono::Duration::days(100)),
574            metadata: create_test_metadata(),
575        };
576        index.insert(old_entry).await.unwrap();
577
578        // Insert entry with recent access time
579        let recent_entry = FileIndexEntry {
580            id: "recent_file".to_string(),
581            path: PathBuf::from("recent/path"),
582            size: 100,
583            ref_count: 0,
584            created_at: Utc::now(),
585            last_accessed_at: Some(Utc::now()),
586            metadata: create_test_metadata(),
587        };
588        index.insert(recent_entry).await.unwrap();
589
590        // Get cleanup candidates (older than 30 days, ref_count <= 1)
591        let candidates = index.get_candidates_for_cleanup(1, 30).await.unwrap();
592        assert_eq!(candidates.len(), 1);
593        assert_eq!(candidates[0].id, "old_file");
594    }
595
596    #[tokio::test]
597    async fn test_migration_from_jsonl() {
598        let temp_dir = TempDir::new().unwrap();
599        let jsonl_path = temp_dir.path().join("index.jsonl");
600        let db_path = temp_dir.path().join("index.db");
601
602        // Create JSONL file
603        let entry1 = serde_json::json!({
604            "id": "file1",
605            "path": "ab/c1",
606            "size": 100,
607            "ref_count": 1,
608            "created_at": Utc::now().to_rfc3339(),
609            "last_accessed_at": null,
610            "metadata": {
611                "name": "test1.txt",
612                "size": 100,
613                "mime_type": null,
614                "source": null,
615                "created_at": Utc::now().to_rfc3339(),
616                "last_accessed_at": null,
617                "preview": null
618            }
619        });
620
621        let entry2 = serde_json::json!({
622            "id": "file2",
623            "path": "ab/c2",
624            "size": 200,
625            "ref_count": 2,
626            "created_at": Utc::now().to_rfc3339(),
627            "last_accessed_at": null,
628            "metadata": {
629                "name": "test2.txt",
630                "size": 200,
631                "mime_type": null,
632                "source": null,
633                "created_at": Utc::now().to_rfc3339(),
634                "last_accessed_at": null,
635                "preview": null
636            }
637        });
638
639        tokio::fs::write(&jsonl_path, format!("{}\n{}\n", entry1, entry2))
640            .await
641            .unwrap();
642
643        // Migrate
644        let index = SqliteIndex::new(db_path).await.unwrap();
645        let migrated = index.migrate_from_jsonl(&jsonl_path).await.unwrap();
646        assert_eq!(migrated, 2);
647
648        // Verify
649        assert_eq!(index.len().await.unwrap(), 2);
650        let file1 = index.get("file1", false).await.unwrap().unwrap();
651        assert_eq!(file1.size, 100);
652        let file2 = index.get("file2", false).await.unwrap().unwrap();
653        assert_eq!(file2.size, 200);
654    }
655}