1use crate::handle::{FileIndexEntry, FileMetadata};
11use crate::Result;
12use chrono::{DateTime, Utc};
13use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
14use sqlx::{Row, SqlitePool};
15use std::path::{Path, PathBuf};
16
17pub struct SqliteIndex {
19 pool: SqlitePool,
20 db_path: PathBuf,
21}
22
23impl SqliteIndex {
24 pub async fn new(db_path: PathBuf) -> Result<Self> {
26 if let Some(parent) = db_path.parent() {
28 tokio::fs::create_dir_all(parent).await?;
29 }
30
31 let options = SqliteConnectOptions::new()
33 .filename(&db_path)
34 .create_if_missing(true);
35
36 let pool = SqlitePoolOptions::new()
37 .max_connections(5)
38 .connect_with(options)
39 .await?;
40
41 let index = Self { pool, db_path };
42 index.init().await?;
43
44 Ok(index)
45 }
46
47 async fn init(&self) -> Result<()> {
49 sqlx::query(
51 r#"
52 CREATE TABLE IF NOT EXISTS files (
53 id TEXT PRIMARY KEY NOT NULL,
54 path TEXT NOT NULL,
55 size INTEGER NOT NULL DEFAULT 0,
56 ref_count INTEGER NOT NULL DEFAULT 1,
57 created_at TEXT NOT NULL,
58 last_accessed_at TEXT,
59 deleted_at TEXT, -- NEW: soft delete timestamp
60 deleted_by TEXT, -- NEW: who deleted the file
61 metadata_json TEXT NOT NULL
62 );
63
64 CREATE INDEX IF NOT EXISTS idx_files_ref_count ON files(ref_count);
65 CREATE INDEX IF NOT EXISTS idx_files_last_accessed ON files(last_accessed_at);
66 CREATE INDEX IF NOT EXISTS idx_files_deleted_at ON files(deleted_at) WHERE deleted_at IS NOT NULL;
67 "#,
68 )
69 .execute(&self.pool)
70 .await?;
71
72 let columns: Vec<String> =
74 sqlx::query_scalar("SELECT name FROM pragma_table_info('files')")
75 .fetch_all(&self.pool)
76 .await?;
77
78 if !columns.contains(&"deleted_at".to_string()) {
79 let _ = sqlx::query("ALTER TABLE files ADD COLUMN deleted_at TEXT")
80 .execute(&self.pool)
81 .await;
82 }
83 if !columns.contains(&"deleted_by".to_string()) {
84 let _ = sqlx::query("ALTER TABLE files ADD COLUMN deleted_by TEXT")
85 .execute(&self.pool)
86 .await;
87 }
88
89 tracing::info!("SQLite index initialized at {:?}", self.db_path);
90 Ok(())
91 }
92
93 pub async fn load(db_path: PathBuf) -> Result<Self> {
95 Self::new(db_path).await
96 }
97
98 pub async fn get(&self, id: &str, include_deleted: bool) -> Result<Option<FileIndexEntry>> {
104 let row = if include_deleted {
105 sqlx::query(
106 r#"
107 SELECT id, path, size, ref_count, created_at, last_accessed_at, metadata_json
108 FROM files WHERE id = ?
109 "#,
110 )
111 .bind(id)
112 .fetch_optional(&self.pool)
113 .await?
114 } else {
115 sqlx::query(
116 r#"
117 SELECT id, path, size, ref_count, created_at, last_accessed_at, metadata_json
118 FROM files WHERE id = ? AND deleted_at IS NULL
119 "#,
120 )
121 .bind(id)
122 .fetch_optional(&self.pool)
123 .await?
124 };
125
126 match row {
127 Some(row) => Ok(Some(self.row_to_entry(&row)?)),
128 None => Ok(None),
129 }
130 }
131
132 pub async fn soft_delete(&self, id: &str, deleted_by: Option<&str>) -> Result<bool> {
137 let result = sqlx::query(
138 r#"
139 UPDATE files
140 SET deleted_at = ?, deleted_by = ?
141 WHERE id = ? AND deleted_at IS NULL
142 "#,
143 )
144 .bind(Utc::now().to_rfc3339())
145 .bind(deleted_by)
146 .bind(id)
147 .execute(&self.pool)
148 .await?;
149
150 Ok(result.rows_affected() > 0)
151 }
152
153 pub async fn restore(&self, id: &str) -> Result<bool> {
157 let result = sqlx::query(
158 r#"
159 UPDATE files
160 SET deleted_at = NULL, deleted_by = NULL
161 WHERE id = ? AND deleted_at IS NOT NULL
162 "#,
163 )
164 .bind(id)
165 .execute(&self.pool)
166 .await?;
167
168 Ok(result.rows_affected() > 0)
169 }
170
171 pub async fn list_deleted(&self) -> Result<Vec<FileIndexEntry>> {
173 let rows = sqlx::query(
174 r#"
175 SELECT id, path, size, ref_count, created_at, last_accessed_at, metadata_json
176 FROM files
177 WHERE deleted_at IS NOT NULL
178 ORDER BY deleted_at DESC
179 "#,
180 )
181 .fetch_all(&self.pool)
182 .await?;
183
184 let mut entries = Vec::new();
185 for row in rows {
186 entries.push(self.row_to_entry(&row)?);
187 }
188
189 Ok(entries)
190 }
191
192 pub async fn get_expired_deletions(&self, retention_days: u32) -> Result<Vec<FileIndexEntry>> {
197 let cutoff = Utc::now() - chrono::Duration::days(retention_days as i64);
198 let cutoff_str = cutoff.to_rfc3339();
199
200 let rows = sqlx::query(
201 r#"
202 SELECT id, path, size, ref_count, created_at, last_accessed_at, metadata_json
203 FROM files
204 WHERE deleted_at IS NOT NULL AND deleted_at < ?
205 "#,
206 )
207 .bind(cutoff_str)
208 .fetch_all(&self.pool)
209 .await?;
210
211 let mut entries = Vec::new();
212 for row in rows {
213 entries.push(self.row_to_entry(&row)?);
214 }
215
216 Ok(entries)
217 }
218
219 pub async fn hard_delete(&self, id: &str) -> Result<Option<FileIndexEntry>> {
224 let entry = self.get(id, true).await?;
226
227 if entry.is_some() {
228 sqlx::query("DELETE FROM files WHERE id = ?")
229 .bind(id)
230 .execute(&self.pool)
231 .await?;
232 }
233
234 Ok(entry)
235 }
236
237 pub async fn insert(&self, entry: FileIndexEntry) -> Result<()> {
239 let metadata_json = serde_json::to_string(&entry.metadata)?;
240
241 sqlx::query(
242 r#"
243 INSERT INTO files (id, path, size, ref_count, created_at, last_accessed_at, metadata_json)
244 VALUES (?, ?, ?, ?, ?, ?, ?)
245 ON CONFLICT(id) DO UPDATE SET
246 path = excluded.path,
247 size = excluded.size,
248 ref_count = excluded.ref_count,
249 last_accessed_at = excluded.last_accessed_at,
250 metadata_json = excluded.metadata_json,
251 deleted_at = NULL,
252 deleted_by = NULL
253 "#,
254 )
255 .bind(&entry.id)
256 .bind(entry.path.to_string_lossy().to_string())
257 .bind(entry.size as i64)
258 .bind(entry.ref_count as i64)
259 .bind(entry.created_at.to_rfc3339())
260 .bind(entry.last_accessed_at.map(|t| t.to_rfc3339()))
261 .bind(metadata_json)
262 .execute(&self.pool)
263 .await?;
264
265 Ok(())
266 }
267
268 pub async fn remove(&self, id: &str) -> Result<Option<FileIndexEntry>> {
273 let entry = self.get(id, true).await?;
275
276 if entry.is_some() {
277 sqlx::query("DELETE FROM files WHERE id = ?")
278 .bind(id)
279 .execute(&self.pool)
280 .await?;
281 }
282
283 Ok(entry)
284 }
285
286 pub async fn update_ref_count(&self, id: &str, delta: i32) -> Result<Option<usize>> {
295 let current = self.get(id, false).await?;
297
298 if let Some(entry) = current {
299 let new_count = if delta < 0 {
300 entry
301 .ref_count
302 .saturating_sub(delta.unsigned_abs() as usize)
303 } else {
304 entry.ref_count.saturating_add(delta as usize)
305 };
306
307 sqlx::query(
308 r#"
309 UPDATE files
310 SET ref_count = ?, last_accessed_at = ?
311 WHERE id = ?
312 "#,
313 )
314 .bind(new_count as i64)
315 .bind(Utc::now().to_rfc3339())
316 .bind(id)
317 .execute(&self.pool)
318 .await?;
319
320 Ok(Some(new_count))
321 } else {
322 Ok(None)
323 }
324 }
325
326 pub async fn get_candidates_for_cleanup(
328 &self,
329 threshold: i32,
330 max_age_days: u32,
331 ) -> Result<Vec<FileIndexEntry>> {
332 let cutoff = Utc::now() - chrono::Duration::days(max_age_days as i64);
333 let cutoff_str = cutoff.to_rfc3339();
334
335 let rows = sqlx::query(
336 r#"
337 SELECT id, path, size, ref_count, created_at, last_accessed_at, metadata_json
338 FROM files
339 WHERE ref_count <= ?
340 AND (last_accessed_at IS NULL OR last_accessed_at < ?)
341 "#,
342 )
343 .bind(threshold as i64)
344 .bind(cutoff_str)
345 .fetch_all(&self.pool)
346 .await?;
347
348 let mut entries = Vec::new();
349 for row in rows {
350 entries.push(self.row_to_entry(&row)?);
351 }
352
353 Ok(entries)
354 }
355
356 pub async fn len(&self) -> Result<usize> {
358 let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM files")
359 .fetch_one(&self.pool)
360 .await?;
361 Ok(count as usize)
362 }
363
364 pub async fn is_empty(&self) -> Result<bool> {
366 Ok(self.len().await? == 0)
367 }
368
369 pub async fn entries(&self) -> Result<Vec<FileIndexEntry>> {
371 let rows = sqlx::query(
372 r#"
373 SELECT id, path, size, ref_count, created_at, last_accessed_at, metadata_json
374 FROM files
375 "#,
376 )
377 .fetch_all(&self.pool)
378 .await?;
379
380 let mut entries = Vec::new();
381 for row in rows {
382 entries.push(self.row_to_entry(&row)?);
383 }
384
385 Ok(entries)
386 }
387
388 pub async fn stats(&self) -> Result<IndexStats> {
390 let row = sqlx::query(
391 r#"
392 SELECT
393 COUNT(*) as total_files,
394 COALESCE(SUM(size), 0) as total_size,
395 COALESCE(SUM(ref_count), 0) as total_refs
396 FROM files
397 "#,
398 )
399 .fetch_one(&self.pool)
400 .await?;
401
402 Ok(IndexStats {
403 total_files: row.get::<i64, _>("total_files") as usize,
404 total_size: row.get::<i64, _>("total_size") as u64,
405 total_refs: row.get::<i64, _>("total_refs") as usize,
406 })
407 }
408
409 pub async fn close(&self) {
411 self.pool.close().await;
412 }
413
414 fn row_to_entry(&self, row: &sqlx::sqlite::SqliteRow) -> Result<FileIndexEntry> {
416 let metadata_json: String = row.get("metadata_json");
417 let metadata: FileMetadata = serde_json::from_str(&metadata_json)?;
418
419 Ok(FileIndexEntry {
420 id: row.get("id"),
421 path: PathBuf::from(row.get::<String, _>("path")),
422 size: row.get::<i64, _>("size") as u64,
423 ref_count: row.get::<i64, _>("ref_count") as usize,
424 created_at: DateTime::parse_from_rfc3339(&row.get::<String, _>("created_at"))?
425 .with_timezone(&Utc),
426 last_accessed_at: row
427 .get::<Option<String>, _>("last_accessed_at")
428 .map(|s| DateTime::parse_from_rfc3339(&s).map(|dt| dt.with_timezone(&Utc)))
429 .transpose()?,
430 metadata,
431 })
432 }
433
434 pub async fn migrate_from_jsonl(&self, jsonl_path: &Path) -> Result<usize> {
436 if !jsonl_path.exists() {
437 return Ok(0);
438 }
439
440 let content = tokio::fs::read_to_string(jsonl_path).await?;
441 let mut count = 0;
442
443 for line in content.lines() {
444 if line.trim().is_empty() {
445 continue;
446 }
447 if let Ok(entry) = serde_json::from_str::<FileIndexEntry>(line) {
448 self.insert(entry).await?;
449 count += 1;
450 } else {
451 tracing::warn!("Failed to parse index entry during migration: {}", line);
452 }
453 }
454
455 tracing::info!("Migrated {} entries from JSONL to SQLite", count);
456 Ok(count)
457 }
458}
459
460#[derive(Debug, Clone, Default)]
462pub struct IndexStats {
463 pub total_files: usize,
464 pub total_size: u64,
465 pub total_refs: usize,
466}
467
468pub type FileIndex = SqliteIndex;
473
474#[cfg(test)]
475mod tests {
476 use super::*;
477 use tempfile::TempDir;
478
479 fn create_test_metadata() -> FileMetadata {
480 FileMetadata {
481 name: "test.txt".to_string(),
482 size: 100,
483 mime_type: Some("text/plain".to_string()),
484 source: Some("test".to_string()),
485 created_at: Utc::now(),
486 last_accessed_at: None,
487 preview: None,
488 }
489 }
490
491 #[tokio::test]
492 async fn test_sqlite_index_basic() {
493 let temp_dir = TempDir::new().unwrap();
494 let db_path = temp_dir.path().join("index.db");
495
496 let index = SqliteIndex::new(db_path).await.unwrap();
497
498 let entry = FileIndexEntry {
500 id: "abc123".to_string(),
501 path: PathBuf::from("ab/c123"),
502 size: 100,
503 ref_count: 1,
504 created_at: Utc::now(),
505 last_accessed_at: None,
506 metadata: create_test_metadata(),
507 };
508
509 index.insert(entry).await.unwrap();
510 assert_eq!(index.len().await.unwrap(), 1);
511
512 let retrieved = index.get("abc123", false).await.unwrap();
514 assert!(retrieved.is_some());
515 let retrieved = retrieved.unwrap();
516 assert_eq!(retrieved.id, "abc123");
517 assert_eq!(retrieved.ref_count, 1);
518
519 let new_count = index.update_ref_count("abc123", 1).await.unwrap();
521 assert_eq!(new_count, Some(2));
522
523 let updated = index.get("abc123", false).await.unwrap().unwrap();
524 assert_eq!(updated.ref_count, 2);
525
526 let removed = index.remove("abc123").await.unwrap();
528 assert!(removed.is_some());
529 assert_eq!(index.len().await.unwrap(), 0);
530 }
531
532 #[tokio::test]
533 async fn test_sqlite_index_stats() {
534 let temp_dir = TempDir::new().unwrap();
535 let db_path = temp_dir.path().join("index.db");
536
537 let index = SqliteIndex::new(db_path).await.unwrap();
538
539 for i in 0..5 {
541 let entry = FileIndexEntry {
542 id: format!("file{}", i),
543 path: PathBuf::from(format!("f{}/{}", i, i)),
544 size: 100 * (i + 1) as u64,
545 ref_count: i + 1,
546 created_at: Utc::now(),
547 last_accessed_at: None,
548 metadata: create_test_metadata(),
549 };
550 index.insert(entry).await.unwrap();
551 }
552
553 let stats = index.stats().await.unwrap();
554 assert_eq!(stats.total_files, 5);
555 assert_eq!(stats.total_size, 100 + 200 + 300 + 400 + 500);
556 assert_eq!(stats.total_refs, 1 + 2 + 3 + 4 + 5);
557 }
558
559 #[tokio::test]
560 async fn test_sqlite_index_cleanup_candidates() {
561 let temp_dir = TempDir::new().unwrap();
562 let db_path = temp_dir.path().join("index.db");
563
564 let index = SqliteIndex::new(db_path).await.unwrap();
565
566 let old_entry = FileIndexEntry {
568 id: "old_file".to_string(),
569 path: PathBuf::from("old/path"),
570 size: 100,
571 ref_count: 0,
572 created_at: Utc::now() - chrono::Duration::days(100),
573 last_accessed_at: Some(Utc::now() - chrono::Duration::days(100)),
574 metadata: create_test_metadata(),
575 };
576 index.insert(old_entry).await.unwrap();
577
578 let recent_entry = FileIndexEntry {
580 id: "recent_file".to_string(),
581 path: PathBuf::from("recent/path"),
582 size: 100,
583 ref_count: 0,
584 created_at: Utc::now(),
585 last_accessed_at: Some(Utc::now()),
586 metadata: create_test_metadata(),
587 };
588 index.insert(recent_entry).await.unwrap();
589
590 let candidates = index.get_candidates_for_cleanup(1, 30).await.unwrap();
592 assert_eq!(candidates.len(), 1);
593 assert_eq!(candidates[0].id, "old_file");
594 }
595
596 #[tokio::test]
597 async fn test_migration_from_jsonl() {
598 let temp_dir = TempDir::new().unwrap();
599 let jsonl_path = temp_dir.path().join("index.jsonl");
600 let db_path = temp_dir.path().join("index.db");
601
602 let entry1 = serde_json::json!({
604 "id": "file1",
605 "path": "ab/c1",
606 "size": 100,
607 "ref_count": 1,
608 "created_at": Utc::now().to_rfc3339(),
609 "last_accessed_at": null,
610 "metadata": {
611 "name": "test1.txt",
612 "size": 100,
613 "mime_type": null,
614 "source": null,
615 "created_at": Utc::now().to_rfc3339(),
616 "last_accessed_at": null,
617 "preview": null
618 }
619 });
620
621 let entry2 = serde_json::json!({
622 "id": "file2",
623 "path": "ab/c2",
624 "size": 200,
625 "ref_count": 2,
626 "created_at": Utc::now().to_rfc3339(),
627 "last_accessed_at": null,
628 "metadata": {
629 "name": "test2.txt",
630 "size": 200,
631 "mime_type": null,
632 "source": null,
633 "created_at": Utc::now().to_rfc3339(),
634 "last_accessed_at": null,
635 "preview": null
636 }
637 });
638
639 tokio::fs::write(&jsonl_path, format!("{}\n{}\n", entry1, entry2))
640 .await
641 .unwrap();
642
643 let index = SqliteIndex::new(db_path).await.unwrap();
645 let migrated = index.migrate_from_jsonl(&jsonl_path).await.unwrap();
646 assert_eq!(migrated, 2);
647
648 assert_eq!(index.len().await.unwrap(), 2);
650 let file1 = index.get("file1", false).await.unwrap().unwrap();
651 assert_eq!(file1.size, 100);
652 let file2 = index.get("file2", false).await.unwrap().unwrap();
653 assert_eq!(file2.size, 200);
654 }
655}