avocado_core/
db.rs

1//! Database operations using SQLite
2//!
3//! This module handles all database interactions using rusqlite.
4//! SQLite is sufficient for Phase 1 (can handle 10K+ documents easily).
5
6use crate::types::{Artifact, Result, Span, Session, Message, MessageRole, SessionWorkingSet, SessionWithMessages, WorkingSet, CompilerConfig};
7use crate::index::VectorIndex;
8use rusqlite::{params, Connection, OptionalExtension};
9use std::path::{Path, PathBuf};
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::sync::{Arc, Mutex, RwLock};
12use sha2::{Digest, Sha256};
13use serde::{Serialize, Deserialize};
14
15/// Database connection wrapper with thread-safe access
16#[derive(Clone)]
17pub struct Database {
18    conn: Arc<Mutex<Connection>>,
19    // Cached vector index to avoid rebuilding on every compile request
20    vector_index: Arc<RwLock<Option<Arc<VectorIndex>>>>,
21    // Flag to track if index needs rebuilding (invalidated on ingest)
22    index_dirty: Arc<AtomicBool>,
23    // Path to database file (for index cache location)
24    db_path: PathBuf,
25    // Serialize builds for this Database to avoid concurrent heavy index builds
26    build_lock: Arc<Mutex<()>>,
27}
28
29/// How the ANN index was obtained
30#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
31pub enum IndexLoadKind {
32    /// Loaded from on-disk cache (ANN persistence)
33    LoadedFromCache,
34    /// Built from spans in the database
35    BuiltFromSpans,
36    /// Returned from in-memory cache (no disk or build)
37    CachedInMemory,
38}
39impl Database {
40    /// Create a new database connection and run migrations
41    ///
42    /// # Arguments
43    ///
44    /// * `path` - Path to the SQLite database file
45    ///
46    /// # Returns
47    ///
48    /// A new Database instance
49    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
50        let db_path = path.as_ref().to_path_buf();
51        let conn = Connection::open(&db_path)?;
52
53        // Run initial migration (without PRAGMA statements)
54        let schema_001 = r#"-- AvocadoDB Initial Schema
55-- Phase 1: Simple SQLite-compatible schema for deterministic context compilation
56
57-- Artifacts table: stores ingested documents
58CREATE TABLE IF NOT EXISTS artifacts (
59    id TEXT PRIMARY KEY,                      -- UUID v4
60    path TEXT NOT NULL UNIQUE,                -- File path or identifier
61    content TEXT NOT NULL,                    -- Full document text
62    content_hash TEXT NOT NULL,               -- SHA256 of content
63    metadata TEXT,                            -- JSON string with arbitrary metadata
64    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
65    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
66);
67
68-- Spans table: stores document fragments with embeddings
69CREATE TABLE IF NOT EXISTS spans (
70    id TEXT PRIMARY KEY,                      -- UUID v4
71    artifact_id TEXT NOT NULL,                -- Foreign key to artifacts
72    start_line INTEGER NOT NULL,              -- Starting line number (1-indexed)
73    end_line INTEGER NOT NULL,                -- Ending line number (inclusive)
74    text TEXT NOT NULL,                       -- Actual span text
75    embedding BLOB,                           -- Serialized f32 vector (1536 dims for ada-002)
76    embedding_model TEXT,                     -- e.g., "text-embedding-ada-002"
77    token_count INTEGER,                      -- Estimated token count
78    metadata TEXT,                            -- JSON string
79    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
80    FOREIGN KEY (artifact_id) REFERENCES artifacts(id) ON DELETE CASCADE
81);
82
83-- Indexes for performance
84CREATE INDEX IF NOT EXISTS idx_spans_artifact ON spans(artifact_id);
85CREATE INDEX IF NOT EXISTS idx_spans_lines ON spans(artifact_id, start_line, end_line);
86CREATE INDEX IF NOT EXISTS idx_artifacts_path ON artifacts(path);
87CREATE INDEX IF NOT EXISTS idx_artifacts_hash ON artifacts(content_hash);
88
89-- Enable WAL mode for better concurrency
90PRAGMA journal_mode = WAL;
91PRAGMA foreign_keys = ON;
92"#;
93
94        // Execute the schema without PRAGMAs
95        let schema_without_pragma = schema_001
96            .lines()
97            .filter(|line| {
98                let trimmed = line.trim();
99                !trimmed.starts_with("PRAGMA") && !trimmed.starts_with("-- Enable WAL")
100            })
101            .collect::<Vec<_>>()
102            .join("\n");
103
104        conn.execute_batch(&schema_without_pragma)?;
105
106        // Run session management migration
107        let schema_002 = r#"-- AvocadoDB Session Management Schema
108-- Phase 2, Priority 1: Session tracking for conversation history and agent memory
109
110-- Sessions table: tracks conversation sessions
111CREATE TABLE IF NOT EXISTS sessions (
112    id TEXT PRIMARY KEY,                      -- UUID v4
113    user_id TEXT,                             -- Optional user identifier
114    title TEXT,                               -- Optional session title (auto-generated or user-provided)
115    metadata TEXT,                            -- JSON string with arbitrary metadata
116    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
117    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
118    last_message_at TIMESTAMP                 -- For sorting/filtering
119);
120
121-- Messages table: stores individual conversation turns
122CREATE TABLE IF NOT EXISTS messages (
123    id TEXT PRIMARY KEY,                      -- UUID v4
124    session_id TEXT NOT NULL,                 -- Foreign key to sessions
125    role TEXT NOT NULL,                       -- 'user', 'assistant', 'system', 'tool'
126    content TEXT NOT NULL,                    -- Message content
127    metadata TEXT,                            -- JSON string (tool calls, citations, etc.)
128    sequence_number INTEGER NOT NULL,         -- Order within session (0-indexed)
129    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
130    FOREIGN KEY (session_id) REFERENCES sessions(id) ON DELETE CASCADE
131);
132
133-- Working set associations: links compiled contexts to sessions
134CREATE TABLE IF NOT EXISTS session_working_sets (
135    id TEXT PRIMARY KEY,                      -- UUID v4
136    session_id TEXT NOT NULL,                 -- Foreign key to sessions
137    message_id TEXT,                          -- Optional: which message triggered this compilation
138    working_set_id TEXT NOT NULL,             -- Reference to working set (stored as JSON for now)
139    query TEXT NOT NULL,                      -- Query that generated this working set
140    config TEXT,                              -- JSON string of CompilerConfig used
141    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
142    FOREIGN KEY (session_id) REFERENCES sessions(id) ON DELETE CASCADE,
143    FOREIGN KEY (message_id) REFERENCES messages(id) ON DELETE SET NULL
144);
145
146-- Agents table: stores registered agents for multi-agent orchestration
147CREATE TABLE IF NOT EXISTS agents (
148    id TEXT PRIMARY KEY,                      -- UUID v4
149    name TEXT NOT NULL,                       -- Human-readable name (e.g., "moderator")
150    role TEXT NOT NULL,                       -- Agent's role/persona description
151    model TEXT NOT NULL,                      -- LLM model identifier
152    system_prompt TEXT,                       -- Optional system prompt / personality
153    did TEXT,                                 -- Optional DID for decentralized identity
154    capabilities TEXT,                        -- JSON array of capabilities
155    metadata TEXT,                            -- JSON string with arbitrary metadata
156    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
157);
158
159-- Agent relations table: tracks agreements/disagreements between agents
160CREATE TABLE IF NOT EXISTS agent_relations (
161    id TEXT PRIMARY KEY,                      -- UUID v4
162    session_id TEXT NOT NULL,                 -- Session where this occurred
163    message_id TEXT NOT NULL,                 -- Message that created this relation
164    from_agent_id TEXT NOT NULL,              -- Agent who expressed the stance
165    to_agent_id TEXT NOT NULL,                -- Agent being referenced
166    stance TEXT NOT NULL,                     -- 'agree', 'disagree', 'neutral', 'question'
167    target_message_id TEXT NOT NULL,          -- Message being referenced
168    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
169    FOREIGN KEY (session_id) REFERENCES sessions(id) ON DELETE CASCADE,
170    FOREIGN KEY (message_id) REFERENCES messages(id) ON DELETE CASCADE,
171    FOREIGN KEY (from_agent_id) REFERENCES agents(id) ON DELETE CASCADE,
172    FOREIGN KEY (to_agent_id) REFERENCES agents(id) ON DELETE CASCADE
173);
174
175-- Indexes for performance
176CREATE INDEX IF NOT EXISTS idx_sessions_user ON sessions(user_id);
177CREATE INDEX IF NOT EXISTS idx_sessions_updated ON sessions(updated_at DESC);
178CREATE INDEX IF NOT EXISTS idx_messages_session ON messages(session_id, sequence_number);
179CREATE INDEX IF NOT EXISTS idx_working_sets_session ON session_working_sets(session_id);
180CREATE INDEX IF NOT EXISTS idx_agents_name ON agents(name);
181CREATE INDEX IF NOT EXISTS idx_agent_relations_session ON agent_relations(session_id);
182CREATE INDEX IF NOT EXISTS idx_agent_relations_from ON agent_relations(from_agent_id);
183CREATE INDEX IF NOT EXISTS idx_agent_relations_to ON agent_relations(to_agent_id);
184"#;
185        conn.execute_batch(schema_002)?;
186
187        // Execute PRAGMAs separately (they return results)
188        conn.pragma_update(None, "journal_mode", "WAL")?;
189        conn.pragma_update(None, "foreign_keys", true)?;
190
191        Ok(Self {
192            conn: Arc::new(Mutex::new(conn)),
193            vector_index: Arc::new(RwLock::new(None)),
194            index_dirty: Arc::new(AtomicBool::new(true)),
195            db_path,
196            build_lock: Arc::new(Mutex::new(())),
197        })
198    }
199
200    /// Insert an artifact into the database
201    ///
202    /// # Arguments
203    ///
204    /// * `artifact` - The artifact to insert
205    ///
206    /// # Returns
207    ///
208    /// Ok(()) if successful
209    pub fn insert_artifact(&self, artifact: &Artifact) -> Result<()> {
210        let conn = self.conn.lock()
211            .map_err(|e| crate::types::Error::Other(anyhow::anyhow!("Database lock poisoned: {}", e)))?;
212        conn.execute(
213            "INSERT INTO artifacts (id, path, content, content_hash, metadata, created_at)
214             VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
215            params![
216                artifact.id,
217                artifact.path,
218                artifact.content,
219                artifact.content_hash,
220                artifact.metadata.as_ref().map(|m| m.to_string()),
221                artifact.created_at.to_rfc3339(),
222            ],
223        )?;
224        // Invalidate cached index since we added a new artifact
225        self.index_dirty.store(true, Ordering::Release);
226        // Delete index cache directory since it's now stale
227        let _ = std::fs::remove_dir_all(self.get_index_cache_dir());
228        Ok(())
229    }
230
231    /// Insert multiple spans in a transaction
232    ///
233    /// # Arguments
234    ///
235    /// * `spans` - Vector of spans to insert
236    ///
237    /// # Returns
238    ///
239    /// Ok(()) if successful
240    pub fn insert_spans(&self, spans: &[Span]) -> Result<()> {
241        let mut conn = self.conn.lock()
242            .map_err(|e| crate::types::Error::Other(anyhow::anyhow!("Database lock poisoned: {}", e)))?;
243        let tx = conn.transaction()?;
244
245        for span in spans {
246            tx.execute(
247                "INSERT INTO spans (
248                    id, artifact_id, start_line, end_line, text,
249                    embedding, embedding_model, token_count, metadata
250                ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
251                params![
252                    span.id,
253                    span.artifact_id,
254                    span.start_line as i64,
255                    span.end_line as i64,
256                    span.text,
257                    span.embedding.as_ref().map(|e| serialize_embedding(e)),
258                    span.embedding_model,
259                    span.token_count as i64,
260                    span.metadata.as_ref().map(|m| m.to_string()),
261                ],
262            )?;
263        }
264
265        tx.commit()?;
266        // Invalidate cached index since we added new spans
267        self.index_dirty.store(true, Ordering::Release);
268        // Delete index cache directory since it's now stale
269        let _ = std::fs::remove_dir_all(self.get_index_cache_dir());
270        Ok(())
271    }
272
273    /// Get or build the cached vector index
274    ///
275    /// The index is cached and only rebuilt when data changes (on ingest).
276    /// Phase 2.1: Tries to load from disk first, then builds if needed.
277    ///
278    /// # Returns
279    ///
280    /// A reference-counted vector index
281    pub fn get_vector_index(&self) -> Result<Arc<VectorIndex>> {
282        Ok(self.get_vector_index_with_kind()?.0)
283    }
284
285    /// Get or build the cached vector index and return how it was obtained
286    ///
287    /// Returns the index and an indicator of whether it was loaded from cache
288    /// or freshly built from spans.
289    pub fn get_vector_index_with_kind(&self) -> Result<(Arc<VectorIndex>, IndexLoadKind)> {
290        // Check if index needs rebuilding
291        if self.index_dirty.load(Ordering::Acquire) {
292            // Ensure only one thread builds/loads at a time for this database
293            let _guard = self.build_lock.lock()
294                .map_err(|e| crate::types::Error::Other(anyhow::anyhow!("Build lock poisoned: {}", e)))?;
295            // Re-check after acquiring lock in case another thread already built it
296            if !self.index_dirty.load(Ordering::Acquire) {
297                let cached = self.vector_index.read()
298                    .map_err(|e| crate::types::Error::Other(anyhow::anyhow!("Index lock poisoned: {}", e)))?;
299                let idx = cached.as_ref()
300                    .cloned()
301                    .ok_or_else(|| crate::types::Error::Other(anyhow::anyhow!("Index cache empty after build")))?
302                    ;
303                return Ok((idx, IndexLoadKind::CachedInMemory));
304            }
305            // Try to load from disk first (Phase 2.1 persistent index)
306            let cache_dir = self.get_index_cache_dir();
307            if let Ok(index) = self.load_index_from_disk(&cache_dir) {
308                // Index loaded successfully from cache
309                // Note: We still rebuild HNSW from cached spans due to lifetime constraints in hnsw_rs
310                // This is faster than loading from SQLite, but not as fast as loading HNSW structure directly
311                let mut cached = self.vector_index.write()
312                    .map_err(|e| crate::types::Error::Other(anyhow::anyhow!("Index lock poisoned: {}", e)))?;
313                *cached = Some(index.clone());
314                self.index_dirty.store(false, Ordering::Release);
315                return Ok((index, IndexLoadKind::LoadedFromCache));
316            }
317            
318            // Build index from spans (load from SQLite)
319            // For large repos, this can take 1-2 minutes
320            let spans = self.get_all_spans()?;
321            let index = Arc::new(VectorIndex::build(spans));
322            
323            // Save to disk for next time (Phase 2.1)
324            // This saves both HNSW dump files and spans cache
325            // Note: HNSW structure can't be directly loaded due to lifetime constraints,
326            // but caching spans still provides significant speedup (avoids SQLite queries)
327            let _ = self.save_index_to_disk(&cache_dir, &index);
328            
329            // Update cache
330            let mut cached = self.vector_index.write()
331                .map_err(|e| crate::types::Error::Other(anyhow::anyhow!("Index lock poisoned: {}", e)))?;
332            *cached = Some(index.clone());
333            
334            // Mark as clean
335            self.index_dirty.store(false, Ordering::Release);
336            
337            Ok((index, IndexLoadKind::BuiltFromSpans))
338        } else {
339            // Return cached index
340            let cached = self.vector_index.read()
341                .map_err(|e| crate::types::Error::Other(anyhow::anyhow!("Index lock poisoned: {}", e)))?;
342            let idx = cached.as_ref()
343                .cloned()
344                .ok_or_else(|| crate::types::Error::Other(anyhow::anyhow!("Index cache is None but not dirty - this should not happen")))?;
345            Ok((idx, IndexLoadKind::CachedInMemory))
346        }
347    }
348    
349    /// Get the path to the index cache directory
350    fn get_index_cache_dir(&self) -> PathBuf {
351        // Store index cache in a directory next to database: db.sqlite -> db.sqlite.idx/
352        let mut cache_dir = self.db_path.clone();
353        cache_dir.set_extension("sqlite.idx");
354        cache_dir
355    }
356    
357    /// Calculate a hash of all spans to detect changes
358    fn calculate_spans_hash(&self) -> Result<String> {
359        let spans = self.get_all_spans()?;
360        let mut hasher = Sha256::new();
361        for span in &spans {
362            hasher.update(span.id.as_bytes());
363            if let Some(emb) = &span.embedding {
364                hasher.update(&emb.len().to_le_bytes());
365            }
366        }
367        Ok(format!("{:x}", hasher.finalize()))
368    }
369    
370    /// Load index from disk if valid
371    fn load_index_from_disk(&self, cache_dir: &Path) -> Result<Arc<VectorIndex>> {
372        // Try to load using VectorIndex::load_from_disk
373        match VectorIndex::load_from_disk(cache_dir) {
374            Ok(Some(index)) => {
375                // Verify hash matches current spans (double-check)
376                let current_hash = self.calculate_spans_hash()?;
377                let cached_spans = index.spans();
378                let mut hasher = Sha256::new();
379                for span in cached_spans {
380                    hasher.update(span.id.as_bytes());
381                    if let Some(emb) = &span.embedding {
382                        hasher.update(&emb.len().to_le_bytes());
383                    }
384                }
385                let cached_hash = format!("{:x}", hasher.finalize());
386                
387                if cached_hash == current_hash {
388                    Ok(Arc::new(index))
389                } else {
390                    Err(crate::types::Error::NotFound("Index cache is stale".to_string()))
391                }
392            }
393            Ok(None) => Err(crate::types::Error::NotFound("Index cache not found".to_string())),
394            Err(e) => Err(e),
395        }
396    }
397    
398    /// Save index to disk for persistence
399    fn save_index_to_disk(&self, cache_dir: &Path, index: &VectorIndex) -> Result<()> {
400        // Use VectorIndex::save_to_disk which saves both HNSW dump and spans
401        index.save_to_disk(cache_dir)
402            .map_err(|e| crate::types::Error::Other(anyhow::anyhow!("Failed to save index to disk: {}", e)))?;
403        Ok(())
404    }
405
406    /// Get all spans from the database
407    ///
408    /// # Returns
409    ///
410    /// Vector of all spans
411    pub fn get_all_spans(&self) -> Result<Vec<Span>> {
412        let conn = self.conn.lock()
413            .map_err(|e| crate::types::Error::Other(anyhow::anyhow!("Database lock poisoned: {}", e)))?;
414        let mut stmt = conn.prepare(
415            "SELECT id, artifact_id, start_line, end_line, text,
416                    embedding, embedding_model, token_count, metadata
417             FROM spans",
418        )?;
419
420        let spans = stmt
421            .query_map([], |row| {
422                Ok(Span {
423                    id: row.get(0)?,
424                    artifact_id: row.get(1)?,
425                    start_line: row.get::<_, i64>(2)? as usize,
426                    end_line: row.get::<_, i64>(3)? as usize,
427                    text: row.get(4)?,
428                    embedding: row
429                        .get::<_, Option<Vec<u8>>>(5)?
430                        .map(|bytes| deserialize_embedding(&bytes)),
431                    embedding_model: row.get(6)?,
432                    token_count: row.get::<_, i64>(7)? as usize,
433                    metadata: row
434                        .get::<_, Option<String>>(8)?
435                        .and_then(|s| serde_json::from_str(&s).ok()),
436                })
437            })?
438            .collect::<std::result::Result<Vec<_>, _>>()?;
439
440        Ok(spans)
441    }
442
443    /// Get artifact by ID
444    ///
445    /// # Arguments
446    ///
447    /// * `artifact_id` - The artifact ID to look up
448    ///
449    /// # Returns
450    ///
451    /// The artifact if found
452    pub fn get_artifact(&self, artifact_id: &str) -> Result<Option<Artifact>> {
453        let conn = self.conn.lock()
454            .map_err(|e| crate::types::Error::Other(anyhow::anyhow!("Database lock poisoned: {}", e)))?;
455        let mut stmt = conn.prepare(
456            "SELECT id, path, content, content_hash, metadata, created_at
457             FROM artifacts WHERE id = ?1",
458        )?;
459
460        let artifact = stmt
461            .query_row(params![artifact_id], |row| {
462                Ok(Artifact {
463                    id: row.get(0)?,
464                    path: row.get(1)?,
465                    content: row.get(2)?,
466                    content_hash: row.get(3)?,
467                    metadata: row
468                        .get::<_, Option<String>>(4)?
469                        .and_then(|s| serde_json::from_str(&s).ok()),
470                    created_at: row
471                        .get::<_, String>(5)?
472                        .parse()
473                        .unwrap_or_else(|_| chrono::Utc::now()),
474                })
475            })
476            .optional()?;
477
478        Ok(artifact)
479    }
480
481    /// Get artifact by path
482    ///
483    /// Returns the artifact row matching the unique path, if present.
484    pub fn get_artifact_by_path(&self, path: &str) -> Result<Option<Artifact>> {
485        let conn = self.conn.lock()
486            .map_err(|e| crate::types::Error::Other(anyhow::anyhow!("Database lock poisoned: {}", e)))?;
487        let mut stmt = conn.prepare(
488            "SELECT id, path, content, content_hash, metadata, created_at
489             FROM artifacts WHERE path = ?1",
490        )?;
491
492        let artifact = stmt
493            .query_row(params![path], |row| {
494                Ok(Artifact {
495                    id: row.get(0)?,
496                    path: row.get(1)?,
497                    content: row.get(2)?,
498                    content_hash: row.get(3)?,
499                    metadata: row
500                        .get::<_, Option<String>>(4)?
501                        .and_then(|s| serde_json::from_str(&s).ok()),
502                    created_at: row
503                        .get::<_, String>(5)?
504                        .parse()
505                        .unwrap_or_else(|_| chrono::Utc::now()),
506                })
507            })
508            .optional()?;
509
510        Ok(artifact)
511    }
512
513    /// Determine what action to take when ingesting a document
514    ///
515    /// Compares content hash to detect if document needs update or can be skipped.
516    ///
517    /// # Arguments
518    ///
519    /// * `path` - The document path
520    /// * `content_hash` - SHA256 hash of the new content
521    ///
522    /// # Returns
523    ///
524    /// - `IngestAction::Skip` if document exists with same content hash
525    /// - `IngestAction::Update` if document exists but content changed
526    /// - `IngestAction::Create` if document doesn't exist
527    pub fn determine_ingest_action(&self, path: &str, content_hash: &str) -> Result<crate::types::IngestAction> {
528        match self.get_artifact_by_path(path)? {
529            Some(existing) => {
530                if existing.content_hash == content_hash {
531                    Ok(crate::types::IngestAction::Skip {
532                        artifact_id: existing.id,
533                        reason: "Content unchanged (same hash)".to_string(),
534                    })
535                } else {
536                    Ok(crate::types::IngestAction::Update {
537                        artifact_id: existing.id,
538                    })
539                }
540            }
541            None => Ok(crate::types::IngestAction::Create),
542        }
543    }
544
545    /// Delete an artifact and its spans
546    ///
547    /// # Arguments
548    ///
549    /// * `artifact_id` - The artifact ID to delete
550    ///
551    /// # Returns
552    ///
553    /// Number of spans deleted
554    pub fn delete_artifact(&self, artifact_id: &str) -> Result<usize> {
555        let conn = self.conn.lock()
556            .map_err(|e| crate::types::Error::Other(anyhow::anyhow!("Database lock poisoned: {}", e)))?;
557
558        // Delete spans first (FK constraint allows CASCADE but let's be explicit)
559        let spans_deleted = conn.execute(
560            "DELETE FROM spans WHERE artifact_id = ?1",
561            params![artifact_id],
562        )?;
563
564        // Delete artifact
565        conn.execute(
566            "DELETE FROM artifacts WHERE id = ?1",
567            params![artifact_id],
568        )?;
569
570        // Mark index as dirty
571        self.index_dirty.store(true, std::sync::atomic::Ordering::Release);
572
573        // Invalidate disk cache
574        let cache_dir = self.db_path.with_extension("sqlite.idx");
575        if cache_dir.exists() {
576            let _ = std::fs::remove_dir_all(&cache_dir);
577        }
578
579        Ok(spans_deleted)
580    }
581
582    /// Search spans by text content (simple keyword matching)
583    ///
584    /// # Arguments
585    ///
586    /// * `query` - The search query
587    /// * `limit` - Maximum number of results
588    ///
589    /// # Returns
590    ///
591    /// Vector of matching spans
592    pub fn search_spans(&self, query: &str, limit: usize) -> Result<Vec<Span>> {
593        let conn = self.conn.lock()
594            .map_err(|e| crate::types::Error::Other(anyhow::anyhow!("Database lock poisoned: {}", e)))?;
595        let mut stmt = conn.prepare(
596            "SELECT id, artifact_id, start_line, end_line, text,
597                    embedding, embedding_model, token_count, metadata
598             FROM spans
599             WHERE text LIKE ?1
600             LIMIT ?2",
601        )?;
602
603        let pattern = format!("%{}%", query);
604        let spans = stmt
605            .query_map(params![pattern, limit as i64], |row| {
606                Ok(Span {
607                    id: row.get(0)?,
608                    artifact_id: row.get(1)?,
609                    start_line: row.get::<_, i64>(2)? as usize,
610                    end_line: row.get::<_, i64>(3)? as usize,
611                    text: row.get(4)?,
612                    embedding: row
613                        .get::<_, Option<Vec<u8>>>(5)?
614                        .map(|bytes| deserialize_embedding(&bytes)),
615                    embedding_model: row.get(6)?,
616                    token_count: row.get::<_, i64>(7)? as usize,
617                    metadata: row
618                        .get::<_, Option<String>>(8)?
619                        .and_then(|s| serde_json::from_str(&s).ok()),
620                })
621            })?
622            .collect::<std::result::Result<Vec<_>, _>>()?;
623
624        Ok(spans)
625    }
626
627    /// Get database statistics
628    ///
629    /// # Returns
630    ///
631    /// (artifacts_count, spans_count, total_tokens)
632    pub fn get_stats(&self) -> Result<(usize, usize, usize)> {
633        let conn = self.conn.lock()
634            .map_err(|e| crate::types::Error::Other(anyhow::anyhow!("Database lock poisoned: {}", e)))?;
635
636        let artifacts_count: i64 = conn.query_row("SELECT COUNT(*) FROM artifacts", [], |row| {
637            row.get(0)
638        })?;
639
640        let spans_count: i64 = conn.query_row("SELECT COUNT(*) FROM spans", [], |row| row.get(0))?;
641
642        let total_tokens: i64 = conn
643            .query_row("SELECT COALESCE(SUM(token_count), 0) FROM spans", [], |row| {
644                row.get(0)
645            })?;
646
647        Ok((
648            artifacts_count as usize,
649            spans_count as usize,
650            total_tokens as usize,
651        ))
652    }
653
654    /// Clear all data from the database
655    pub fn clear(&self) -> Result<()> {
656        let conn = self.conn.lock()
657            .map_err(|e| crate::types::Error::Other(anyhow::anyhow!("Database lock poisoned: {}", e)))?;
658        conn.execute("DELETE FROM spans", [])?;
659        conn.execute("DELETE FROM artifacts", [])?;
660        // Clear cached index
661        let mut cached = self.vector_index.write()
662            .map_err(|e| crate::types::Error::Other(anyhow::anyhow!("Index lock poisoned: {}", e)))?;
663        *cached = None;
664        self.index_dirty.store(true, Ordering::Release);
665        // Delete index cache directory
666        let _ = std::fs::remove_dir_all(self.get_index_cache_dir());
667        Ok(())
668    }
669
670    // ========== Session Management Operations ==========
671
672    /// Create a new session
673    ///
674    /// # Arguments
675    ///
676    /// * `user_id` - Optional user identifier
677    /// * `title` - Optional session title
678    ///
679    /// # Returns
680    ///
681    /// The newly created session
682    pub fn create_session(&self, user_id: Option<&str>, title: Option<&str>) -> Result<Session> {
683        let conn = self.conn.lock()
684            .map_err(|e| crate::types::Error::Other(anyhow::anyhow!("Database lock poisoned: {}", e)))?;
685
686        let id = uuid::Uuid::new_v4().to_string();
687        let now = chrono::Utc::now();
688
689        conn.execute(
690            "INSERT INTO sessions (id, user_id, title, metadata, created_at, updated_at, last_message_at)
691             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
692            params![
693                id,
694                user_id,
695                title,
696                None::<String>, // metadata
697                now.to_rfc3339(),
698                now.to_rfc3339(),
699                None::<String>, // last_message_at
700            ],
701        )?;
702
703        Ok(Session {
704            id,
705            user_id: user_id.map(|s| s.to_string()),
706            title: title.map(|s| s.to_string()),
707            metadata: None,
708            created_at: now,
709            updated_at: now,
710            last_message_at: None,
711        })
712    }
713
714    /// Get a session by ID
715    ///
716    /// # Arguments
717    ///
718    /// * `session_id` - The session ID to look up
719    ///
720    /// # Returns
721    ///
722    /// The session if found
723    pub fn get_session(&self, session_id: &str) -> Result<Option<Session>> {
724        let conn = self.conn.lock()
725            .map_err(|e| crate::types::Error::Other(anyhow::anyhow!("Database lock poisoned: {}", e)))?;
726
727        let mut stmt = conn.prepare(
728            "SELECT id, user_id, title, metadata, created_at, updated_at, last_message_at
729             FROM sessions WHERE id = ?1",
730        )?;
731
732        let session = stmt.query_row(params![session_id], |row| {
733            Ok(Session {
734                id: row.get(0)?,
735                user_id: row.get(1)?,
736                title: row.get(2)?,
737                metadata: row.get::<_, Option<String>>(3)?
738                    .and_then(|s| serde_json::from_str(&s).ok()),
739                created_at: row.get::<_, String>(4)?
740                    .parse()
741                    .unwrap_or_else(|_| chrono::Utc::now()),
742                updated_at: row.get::<_, String>(5)?
743                    .parse()
744                    .unwrap_or_else(|_| chrono::Utc::now()),
745                last_message_at: row.get::<_, Option<String>>(6)?
746                    .and_then(|s| s.parse().ok()),
747            })
748        }).optional()?;
749
750        Ok(session)
751    }
752
753    /// List sessions for a user (or all sessions if user_id is None)
754    ///
755    /// # Arguments
756    ///
757    /// * `user_id` - Optional user ID to filter by
758    /// * `limit` - Maximum number of sessions to return
759    ///
760    /// # Returns
761    ///
762    /// Vector of sessions, sorted by updated_at descending
763    pub fn list_sessions(&self, user_id: Option<&str>, limit: Option<usize>) -> Result<Vec<Session>> {
764        let conn = self.conn.lock()
765            .map_err(|e| crate::types::Error::Other(anyhow::anyhow!("Database lock poisoned: {}", e)))?;
766
767        let limit_val = limit.unwrap_or(100) as i64;
768
769        let mut sessions = Vec::new();
770
771        if let Some(uid) = user_id {
772            let mut stmt = conn.prepare(
773                "SELECT id, user_id, title, metadata, created_at, updated_at, last_message_at
774                 FROM sessions WHERE user_id = ?1
775                 ORDER BY updated_at DESC
776                 LIMIT ?2"
777            )?;
778
779            let rows = stmt.query_map(params![uid, limit_val], |row| {
780                Ok(Session {
781                    id: row.get(0)?,
782                    user_id: row.get(1)?,
783                    title: row.get(2)?,
784                    metadata: row.get::<_, Option<String>>(3)?
785                        .and_then(|s| serde_json::from_str(&s).ok()),
786                    created_at: row.get::<_, String>(4)?
787                        .parse()
788                        .unwrap_or_else(|_| chrono::Utc::now()),
789                    updated_at: row.get::<_, String>(5)?
790                        .parse()
791                        .unwrap_or_else(|_| chrono::Utc::now()),
792                    last_message_at: row.get::<_, Option<String>>(6)?
793                        .and_then(|s| s.parse().ok()),
794                })
795            })?;
796
797            for row in rows {
798                sessions.push(row?);
799            }
800        } else {
801            let mut stmt = conn.prepare(
802                "SELECT id, user_id, title, metadata, created_at, updated_at, last_message_at
803                 FROM sessions
804                 ORDER BY updated_at DESC
805                 LIMIT ?1"
806            )?;
807
808            let rows = stmt.query_map(params![limit_val], |row| {
809                Ok(Session {
810                    id: row.get(0)?,
811                    user_id: row.get(1)?,
812                    title: row.get(2)?,
813                    metadata: row.get::<_, Option<String>>(3)?
814                        .and_then(|s| serde_json::from_str(&s).ok()),
815                    created_at: row.get::<_, String>(4)?
816                        .parse()
817                        .unwrap_or_else(|_| chrono::Utc::now()),
818                    updated_at: row.get::<_, String>(5)?
819                        .parse()
820                        .unwrap_or_else(|_| chrono::Utc::now()),
821                    last_message_at: row.get::<_, Option<String>>(6)?
822                        .and_then(|s| s.parse().ok()),
823                })
824            })?;
825
826            for row in rows {
827                sessions.push(row?);
828            }
829        }
830
831        Ok(sessions)
832    }
833
834    /// Update session metadata
835    ///
836    /// # Arguments
837    ///
838    /// * `session_id` - The session ID to update
839    /// * `title` - Optional new title
840    /// * `metadata` - Optional new metadata
841    ///
842    /// # Returns
843    ///
844    /// Ok(()) if successful
845    pub fn update_session(
846        &self,
847        session_id: &str,
848        title: Option<&str>,
849        metadata: Option<&serde_json::Value>,
850    ) -> Result<()> {
851        let conn = self.conn.lock()
852            .map_err(|e| crate::types::Error::Other(anyhow::anyhow!("Database lock poisoned: {}", e)))?;
853
854        let now = chrono::Utc::now();
855
856        conn.execute(
857            "UPDATE sessions
858             SET title = COALESCE(?1, title),
859                 metadata = COALESCE(?2, metadata),
860                 updated_at = ?3
861             WHERE id = ?4",
862            params![
863                title,
864                metadata.map(|m| m.to_string()),
865                now.to_rfc3339(),
866                session_id,
867            ],
868        )?;
869
870        Ok(())
871    }
872
873    /// Delete a session (cascades to messages and working sets)
874    ///
875    /// # Arguments
876    ///
877    /// * `session_id` - The session ID to delete
878    ///
879    /// # Returns
880    ///
881    /// Ok(()) if successful
882    pub fn delete_session(&self, session_id: &str) -> Result<()> {
883        let conn = self.conn.lock()
884            .map_err(|e| crate::types::Error::Other(anyhow::anyhow!("Database lock poisoned: {}", e)))?;
885
886        conn.execute("DELETE FROM sessions WHERE id = ?1", params![session_id])?;
887
888        Ok(())
889    }
890
891    /// Add a message to a session
892    ///
893    /// # Arguments
894    ///
895    /// * `session_id` - The session to add the message to
896    /// * `role` - Message role (user, assistant, system, tool)
897    /// * `content` - Message content
898    /// * `metadata` - Optional metadata
899    ///
900    /// # Returns
901    ///
902    /// The newly created message
903    pub fn add_message(
904        &self,
905        session_id: &str,
906        role: MessageRole,
907        content: &str,
908        metadata: Option<&serde_json::Value>,
909    ) -> Result<Message> {
910        let mut conn = self.conn.lock()
911            .map_err(|e| crate::types::Error::Other(anyhow::anyhow!("Database lock poisoned: {}", e)))?;
912
913        let tx = conn.transaction()?;
914
915        // Get the next sequence number for this session
916        let sequence_number: i64 = tx.query_row(
917            "SELECT COALESCE(MAX(sequence_number), -1) + 1 FROM messages WHERE session_id = ?1",
918            params![session_id],
919            |row| row.get(0),
920        )?;
921
922        let id = uuid::Uuid::new_v4().to_string();
923        let now = chrono::Utc::now();
924
925        tx.execute(
926            "INSERT INTO messages (id, session_id, role, content, metadata, sequence_number, created_at)
927             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
928            params![
929                id,
930                session_id,
931                role.as_str(),
932                content,
933                metadata.map(|m| m.to_string()),
934                sequence_number,
935                now.to_rfc3339(),
936            ],
937        )?;
938
939        // Update session's last_message_at and updated_at
940        tx.execute(
941            "UPDATE sessions
942             SET last_message_at = ?1, updated_at = ?1
943             WHERE id = ?2",
944            params![now.to_rfc3339(), session_id],
945        )?;
946
947        tx.commit()?;
948
949        Ok(Message {
950            id,
951            session_id: session_id.to_string(),
952            role,
953            content: content.to_string(),
954            metadata: metadata.cloned(),
955            sequence_number: sequence_number as usize,
956            created_at: now,
957        })
958    }
959
960    /// Get messages for a session
961    ///
962    /// # Arguments
963    ///
964    /// * `session_id` - The session ID
965    /// * `limit` - Optional limit on number of messages (most recent first)
966    ///
967    /// # Returns
968    ///
969    /// Vector of messages in chronological order
970    pub fn get_messages(&self, session_id: &str, limit: Option<usize>) -> Result<Vec<Message>> {
971        let conn = self.conn.lock()
972            .map_err(|e| crate::types::Error::Other(anyhow::anyhow!("Database lock poisoned: {}", e)))?;
973
974        let mut messages = Vec::new();
975
976        if let Some(lim) = limit {
977            let mut stmt = conn.prepare(
978                "SELECT id, session_id, role, content, metadata, sequence_number, created_at
979                 FROM messages
980                 WHERE session_id = ?1
981                 ORDER BY sequence_number ASC
982                 LIMIT ?2"
983            )?;
984
985            let rows = stmt.query_map(params![session_id, lim as i64], |row| {
986                let role_str: String = row.get(2)?;
987                let role = MessageRole::from_str(&role_str)
988                    .unwrap_or(MessageRole::User);
989
990                Ok(Message {
991                    id: row.get(0)?,
992                    session_id: row.get(1)?,
993                    role,
994                    content: row.get(3)?,
995                    metadata: row.get::<_, Option<String>>(4)?
996                        .and_then(|s| serde_json::from_str(&s).ok()),
997                    sequence_number: row.get::<_, i64>(5)? as usize,
998                    created_at: row.get::<_, String>(6)?
999                        .parse()
1000                        .unwrap_or_else(|_| chrono::Utc::now()),
1001                })
1002            })?;
1003
1004            for row in rows {
1005                messages.push(row?);
1006            }
1007        } else {
1008            let mut stmt = conn.prepare(
1009                "SELECT id, session_id, role, content, metadata, sequence_number, created_at
1010                 FROM messages
1011                 WHERE session_id = ?1
1012                 ORDER BY sequence_number ASC"
1013            )?;
1014
1015            let rows = stmt.query_map(params![session_id], |row| {
1016                let role_str: String = row.get(2)?;
1017                let role = MessageRole::from_str(&role_str)
1018                    .unwrap_or(MessageRole::User);
1019
1020                Ok(Message {
1021                    id: row.get(0)?,
1022                    session_id: row.get(1)?,
1023                    role,
1024                    content: row.get(3)?,
1025                    metadata: row.get::<_, Option<String>>(4)?
1026                        .and_then(|s| serde_json::from_str(&s).ok()),
1027                    sequence_number: row.get::<_, i64>(5)? as usize,
1028                    created_at: row.get::<_, String>(6)?
1029                        .parse()
1030                        .unwrap_or_else(|_| chrono::Utc::now()),
1031                })
1032            })?;
1033
1034            for row in rows {
1035                messages.push(row?);
1036            }
1037        }
1038
1039        Ok(messages)
1040    }
1041
1042    /// Associate a working set with a session
1043    ///
1044    /// # Arguments
1045    ///
1046    /// * `session_id` - The session ID
1047    /// * `message_id` - Optional message ID that triggered this compilation
1048    /// * `working_set` - The working set to associate
1049    /// * `query` - Query that generated this working set
1050    /// * `config` - Configuration used for compilation
1051    ///
1052    /// # Returns
1053    ///
1054    /// The newly created SessionWorkingSet
1055    pub fn associate_working_set(
1056        &self,
1057        session_id: &str,
1058        message_id: Option<&str>,
1059        working_set: &WorkingSet,
1060        query: &str,
1061        config: &CompilerConfig,
1062    ) -> Result<SessionWorkingSet> {
1063        let conn = self.conn.lock()
1064            .map_err(|e| crate::types::Error::Other(anyhow::anyhow!("Database lock poisoned: {}", e)))?;
1065
1066        let id = uuid::Uuid::new_v4().to_string();
1067        let working_set_id = uuid::Uuid::new_v4().to_string(); // Generate a unique ID for this working set
1068        let now = chrono::Utc::now();
1069
1070        conn.execute(
1071            "INSERT INTO session_working_sets (id, session_id, message_id, working_set_id, query, config, created_at)
1072             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
1073            params![
1074                id,
1075                session_id,
1076                message_id,
1077                working_set_id,
1078                query,
1079                serde_json::to_string(config)?,
1080                now.to_rfc3339(),
1081            ],
1082        )?;
1083
1084        Ok(SessionWorkingSet {
1085            id,
1086            session_id: session_id.to_string(),
1087            message_id: message_id.map(|s| s.to_string()),
1088            working_set: working_set.clone(),
1089            query: query.to_string(),
1090            config: config.clone(),
1091            created_at: now,
1092        })
1093    }
1094
1095    /// Get session with all messages and working sets
1096    ///
1097    /// # Arguments
1098    ///
1099    /// * `session_id` - The session ID
1100    ///
1101    /// # Returns
1102    ///
1103    /// SessionWithMessages if found
1104    pub fn get_session_full(&self, session_id: &str) -> Result<Option<SessionWithMessages>> {
1105        let session = self.get_session(session_id)?;
1106
1107        if session.is_none() {
1108            return Ok(None);
1109        }
1110
1111        let session = session.unwrap();
1112        let messages = self.get_messages(session_id, None)?;
1113
1114        // Get working sets for this session
1115        let conn = self.conn.lock()
1116            .map_err(|e| crate::types::Error::Other(anyhow::anyhow!("Database lock poisoned: {}", e)))?;
1117
1118        let mut stmt = conn.prepare(
1119            "SELECT id, session_id, message_id, working_set_id, query, config, created_at
1120             FROM session_working_sets
1121             WHERE session_id = ?1
1122             ORDER BY created_at ASC",
1123        )?;
1124
1125        let working_sets = stmt.query_map(params![session_id], |row| {
1126            let config_str: String = row.get(5)?;
1127            let config: CompilerConfig = serde_json::from_str(&config_str)
1128                .unwrap_or_default();
1129
1130            // Note: We can't reconstruct the full WorkingSet from storage without additional data
1131            // For now, we'll create a placeholder. In a real implementation, you'd store the
1132            // working set data as JSON and deserialize it here.
1133            let working_set = WorkingSet {
1134                text: String::new(),
1135                spans: Vec::new(),
1136                citations: Vec::new(),
1137                tokens_used: 0,
1138                query: row.get::<_, String>(4)?,
1139                compilation_time_ms: 0,
1140                manifest: None,
1141                explain: None,
1142            };
1143
1144            Ok(SessionWorkingSet {
1145                id: row.get(0)?,
1146                session_id: row.get(1)?,
1147                message_id: row.get(2)?,
1148                working_set,
1149                query: row.get(4)?,
1150                config,
1151                created_at: row.get::<_, String>(6)?
1152                    .parse()
1153                    .unwrap_or_else(|_| chrono::Utc::now()),
1154            })
1155        })?
1156        .collect::<std::result::Result<Vec<_>, _>>()?;
1157
1158        Ok(Some(SessionWithMessages {
1159            session,
1160            messages,
1161            working_sets,
1162        }))
1163    }
1164
1165    // ========== Multi-Agent Orchestration Operations ==========
1166
1167    /// Register an agent (works for 1 or many agents)
1168    ///
1169    /// # Arguments
1170    ///
1171    /// * `agent` - The agent to register
1172    ///
1173    /// # Returns
1174    ///
1175    /// The registered agent
1176    pub fn register_agent(&self, agent: &crate::types::Agent) -> Result<crate::types::Agent> {
1177        let conn = self.conn.lock()
1178            .map_err(|e| crate::types::Error::Other(anyhow::anyhow!("Database lock poisoned: {}", e)))?;
1179
1180        conn.execute(
1181            "INSERT OR REPLACE INTO agents (id, name, role, model, system_prompt, did, capabilities, metadata, created_at)
1182             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
1183            params![
1184                agent.id,
1185                agent.name,
1186                agent.role,
1187                agent.model,
1188                agent.system_prompt,
1189                agent.did,
1190                agent.capabilities.as_ref().map(|c| serde_json::to_string(c).ok()).flatten(),
1191                agent.metadata.as_ref().map(|m| m.to_string()),
1192                agent.created_at.to_rfc3339(),
1193            ],
1194        )?;
1195
1196        Ok(agent.clone())
1197    }
1198
1199    /// Get an agent by ID
1200    pub fn get_agent(&self, agent_id: &str) -> Result<Option<crate::types::Agent>> {
1201        let conn = self.conn.lock()
1202            .map_err(|e| crate::types::Error::Other(anyhow::anyhow!("Database lock poisoned: {}", e)))?;
1203
1204        let mut stmt = conn.prepare(
1205            "SELECT id, name, role, model, system_prompt, did, capabilities, metadata, created_at
1206             FROM agents WHERE id = ?1"
1207        )?;
1208
1209        let result = stmt.query_row(params![agent_id], |row| {
1210            Ok(crate::types::Agent {
1211                id: row.get(0)?,
1212                name: row.get(1)?,
1213                role: row.get(2)?,
1214                model: row.get(3)?,
1215                system_prompt: row.get(4)?,
1216                did: row.get(5)?,
1217                capabilities: row.get::<_, Option<String>>(6)?
1218                    .and_then(|s| serde_json::from_str(&s).ok()),
1219                metadata: row.get::<_, Option<String>>(7)?
1220                    .and_then(|s| serde_json::from_str(&s).ok()),
1221                created_at: row.get::<_, String>(8)?
1222                    .parse()
1223                    .unwrap_or_else(|_| chrono::Utc::now()),
1224            })
1225        });
1226
1227        match result {
1228            Ok(agent) => Ok(Some(agent)),
1229            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
1230            Err(e) => Err(e.into()),
1231        }
1232    }
1233
1234    /// Get an agent by name within a session (looks up by session participants)
1235    pub fn get_agent_by_name(&self, name: &str) -> Result<Option<crate::types::Agent>> {
1236        let conn = self.conn.lock()
1237            .map_err(|e| crate::types::Error::Other(anyhow::anyhow!("Database lock poisoned: {}", e)))?;
1238
1239        let mut stmt = conn.prepare(
1240            "SELECT id, name, role, model, system_prompt, did, capabilities, metadata, created_at
1241             FROM agents WHERE name = ?1"
1242        )?;
1243
1244        let result = stmt.query_row(params![name], |row| {
1245            Ok(crate::types::Agent {
1246                id: row.get(0)?,
1247                name: row.get(1)?,
1248                role: row.get(2)?,
1249                model: row.get(3)?,
1250                system_prompt: row.get(4)?,
1251                did: row.get(5)?,
1252                capabilities: row.get::<_, Option<String>>(6)?
1253                    .and_then(|s| serde_json::from_str(&s).ok()),
1254                metadata: row.get::<_, Option<String>>(7)?
1255                    .and_then(|s| serde_json::from_str(&s).ok()),
1256                created_at: row.get::<_, String>(8)?
1257                    .parse()
1258                    .unwrap_or_else(|_| chrono::Utc::now()),
1259            })
1260        });
1261
1262        match result {
1263            Ok(agent) => Ok(Some(agent)),
1264            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
1265            Err(e) => Err(e.into()),
1266        }
1267    }
1268
1269    /// List all registered agents
1270    pub fn list_agents(&self) -> Result<Vec<crate::types::Agent>> {
1271        let conn = self.conn.lock()
1272            .map_err(|e| crate::types::Error::Other(anyhow::anyhow!("Database lock poisoned: {}", e)))?;
1273
1274        let mut stmt = conn.prepare(
1275            "SELECT id, name, role, model, system_prompt, did, capabilities, metadata, created_at
1276             FROM agents ORDER BY created_at"
1277        )?;
1278
1279        let agents = stmt.query_map([], |row| {
1280            Ok(crate::types::Agent {
1281                id: row.get(0)?,
1282                name: row.get(1)?,
1283                role: row.get(2)?,
1284                model: row.get(3)?,
1285                system_prompt: row.get(4)?,
1286                did: row.get(5)?,
1287                capabilities: row.get::<_, Option<String>>(6)?
1288                    .and_then(|s| serde_json::from_str(&s).ok()),
1289                metadata: row.get::<_, Option<String>>(7)?
1290                    .and_then(|s| serde_json::from_str(&s).ok()),
1291                created_at: row.get::<_, String>(8)?
1292                    .parse()
1293                    .unwrap_or_else(|_| chrono::Utc::now()),
1294            })
1295        })?
1296        .collect::<std::result::Result<Vec<_>, _>>()?;
1297
1298        Ok(agents)
1299    }
1300
1301    /// Add an agent relation (agreement, disagreement, etc.)
1302    ///
1303    /// Automatically resolves message IDs to agent IDs for proper tracking
1304    pub fn add_agent_relation(
1305        &self,
1306        session_id: &str,
1307        message_id: &str,
1308        from_agent_id: &str,
1309        target_message_id: &str,
1310        stance: crate::types::Stance,
1311    ) -> Result<crate::types::AgentRelation> {
1312        let conn = self.conn.lock()
1313            .map_err(|e| crate::types::Error::Other(anyhow::anyhow!("Database lock poisoned: {}", e)))?;
1314
1315        // Look up the target message to find its agent
1316        let to_agent_id: String = conn.query_row(
1317            "SELECT json_extract(metadata, '$.agent_id') FROM messages WHERE id = ?1",
1318            params![target_message_id],
1319            |row| row.get(0),
1320        ).unwrap_or_else(|_| "unknown".to_string());
1321
1322        let id = uuid::Uuid::new_v4().to_string();
1323        let now = chrono::Utc::now();
1324
1325        conn.execute(
1326            "INSERT INTO agent_relations (id, session_id, message_id, from_agent_id, to_agent_id, stance, target_message_id, created_at)
1327             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
1328            params![
1329                id,
1330                session_id,
1331                message_id,
1332                from_agent_id,
1333                to_agent_id,
1334                stance.as_str(),
1335                target_message_id,
1336                now.to_rfc3339(),
1337            ],
1338        )?;
1339
1340        Ok(crate::types::AgentRelation {
1341            id,
1342            session_id: session_id.to_string(),
1343            message_id: message_id.to_string(),
1344            from_agent_id: from_agent_id.to_string(),
1345            to_agent_id,
1346            stance,
1347            target_message_id: target_message_id.to_string(),
1348            created_at: now,
1349        })
1350    }
1351
1352    /// Get all agent relations for a session with resolved names
1353    pub fn get_agent_relations(&self, session_id: &str) -> Result<crate::types::AgentRelationSummary> {
1354        let conn = self.conn.lock()
1355            .map_err(|e| crate::types::Error::Other(anyhow::anyhow!("Database lock poisoned: {}", e)))?;
1356
1357        let mut stmt = conn.prepare(
1358            "SELECT
1359                ar.id, ar.message_id, ar.target_message_id, ar.stance,
1360                fa.name as from_name, fa.model as from_model,
1361                ta.name as to_name, ta.model as to_model
1362             FROM agent_relations ar
1363             LEFT JOIN agents fa ON ar.from_agent_id = fa.id
1364             LEFT JOIN agents ta ON ar.to_agent_id = ta.id
1365             WHERE ar.session_id = ?1
1366             ORDER BY ar.created_at"
1367        )?;
1368
1369        let mut agreements = Vec::new();
1370        let mut disagreements = Vec::new();
1371        let mut questions = Vec::new();
1372
1373        let rows = stmt.query_map(params![session_id], |row| {
1374            Ok((
1375                row.get::<_, String>(1)?,  // message_id
1376                row.get::<_, String>(2)?,  // target_message_id
1377                row.get::<_, String>(3)?,  // stance
1378                row.get::<_, Option<String>>(4)?.unwrap_or_else(|| "unknown".to_string()),  // from_name
1379                row.get::<_, Option<String>>(5)?.unwrap_or_else(|| "?".to_string()),        // from_model
1380                row.get::<_, Option<String>>(6)?.unwrap_or_else(|| "unknown".to_string()),  // to_name
1381                row.get::<_, Option<String>>(7)?.unwrap_or_else(|| "?".to_string()),        // to_model
1382            ))
1383        })?;
1384
1385        for row in rows {
1386            let (message_id, target_message_id, stance, from_name, from_model, to_name, to_model) = row?;
1387            let entry = crate::types::AgentRelationEntry {
1388                from_agent: from_name,
1389                from_model,
1390                to_agent: to_name,
1391                to_model,
1392                message_id,
1393                target_message_id,
1394            };
1395
1396            match stance.as_str() {
1397                "agree" => agreements.push(entry),
1398                "disagree" => disagreements.push(entry),
1399                "question" => questions.push(entry),
1400                _ => {} // neutral or unknown
1401            }
1402        }
1403
1404        Ok(crate::types::AgentRelationSummary {
1405            agreements,
1406            disagreements,
1407            questions,
1408        })
1409    }
1410
1411    /// Get agents participating in a session (from their messages)
1412    pub fn get_session_agents(&self, session_id: &str) -> Result<Vec<crate::types::Agent>> {
1413        let conn = self.conn.lock()
1414            .map_err(|e| crate::types::Error::Other(anyhow::anyhow!("Database lock poisoned: {}", e)))?;
1415
1416        let mut stmt = conn.prepare(
1417            "SELECT DISTINCT a.id, a.name, a.role, a.model, a.system_prompt, a.did, a.capabilities, a.metadata, a.created_at
1418             FROM agents a
1419             INNER JOIN messages m ON json_extract(m.metadata, '$.agent_id') = a.id
1420             WHERE m.session_id = ?1
1421             ORDER BY a.name"
1422        )?;
1423
1424        let agents = stmt.query_map(params![session_id], |row| {
1425            Ok(crate::types::Agent {
1426                id: row.get(0)?,
1427                name: row.get(1)?,
1428                role: row.get(2)?,
1429                model: row.get(3)?,
1430                system_prompt: row.get(4)?,
1431                did: row.get(5)?,
1432                capabilities: row.get::<_, Option<String>>(6)?
1433                    .and_then(|s| serde_json::from_str(&s).ok()),
1434                metadata: row.get::<_, Option<String>>(7)?
1435                    .and_then(|s| serde_json::from_str(&s).ok()),
1436                created_at: row.get::<_, String>(8)?
1437                    .parse()
1438                    .unwrap_or_else(|_| chrono::Utc::now()),
1439            })
1440        })?
1441        .collect::<std::result::Result<Vec<_>, _>>()?;
1442
1443        Ok(agents)
1444    }
1445
1446    /// Convert this Database to a SqliteBackend for use with the StorageBackend trait
1447    ///
1448    /// This allows existing code using Database to interoperate with code
1449    /// expecting a StorageBackend implementation.
1450    ///
1451    /// # Returns
1452    ///
1453    /// A SqliteBackend wrapping this database's path
1454    pub async fn as_storage_backend(&self) -> Result<crate::storage::SqliteBackend> {
1455        crate::storage::SqliteBackend::new(&self.db_path).await
1456    }
1457}
1458
1459/// Serialize embedding vector to bytes for storage
1460fn serialize_embedding(embedding: &[f32]) -> Vec<u8> {
1461    embedding.iter().flat_map(|f| f.to_le_bytes()).collect()
1462}
1463
1464/// Deserialize embedding vector from bytes
1465fn deserialize_embedding(bytes: &[u8]) -> Vec<f32> {
1466    bytes
1467        .chunks_exact(4)
1468        .map(|chunk| f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]))
1469        .collect()
1470}
1471
1472#[cfg(test)]
1473mod tests {
1474    use super::*;
1475    use uuid::Uuid;
1476
1477    #[test]
1478    fn test_database_creation() {
1479        let db = Database::new(":memory:").unwrap();
1480        let (artifacts, spans, tokens) = db.get_stats().unwrap();
1481        assert_eq!(artifacts, 0);
1482        assert_eq!(spans, 0);
1483        assert_eq!(tokens, 0);
1484    }
1485
1486    #[test]
1487    fn test_insert_artifact() {
1488        let db = Database::new(":memory:").unwrap();
1489
1490        let artifact = Artifact {
1491            id: Uuid::new_v4().to_string(),
1492            path: "test.txt".to_string(),
1493            content: "Test content".to_string(),
1494            content_hash: "hash123".to_string(),
1495            metadata: None,
1496            created_at: chrono::Utc::now(),
1497        };
1498
1499        db.insert_artifact(&artifact).unwrap();
1500
1501        let (count, _, _) = db.get_stats().unwrap();
1502        assert_eq!(count, 1);
1503    }
1504
1505    #[test]
1506    fn test_embedding_serialization() {
1507        let original = vec![1.0, 2.5, -3.14, 0.0];
1508        let bytes = serialize_embedding(&original);
1509        let restored = deserialize_embedding(&bytes);
1510
1511        assert_eq!(original.len(), restored.len());
1512        for (a, b) in original.iter().zip(restored.iter()) {
1513            assert!((a - b).abs() < 0.0001);
1514        }
1515    }
1516
1517    // ========== Session Management Tests ==========
1518
1519    #[test]
1520    fn test_create_session() {
1521        let db = Database::new(":memory:").unwrap();
1522
1523        let session = db.create_session(Some("user123"), Some("Test Session")).unwrap();
1524
1525        assert!(!session.id.is_empty());
1526        assert_eq!(session.user_id, Some("user123".to_string()));
1527        assert_eq!(session.title, Some("Test Session".to_string()));
1528        assert!(session.metadata.is_none());
1529        assert!(session.last_message_at.is_none());
1530    }
1531
1532    #[test]
1533    fn test_get_session() {
1534        let db = Database::new(":memory:").unwrap();
1535
1536        let created = db.create_session(Some("user456"), Some("Another Session")).unwrap();
1537        let retrieved = db.get_session(&created.id).unwrap();
1538
1539        assert!(retrieved.is_some());
1540        let session = retrieved.unwrap();
1541        assert_eq!(session.id, created.id);
1542        assert_eq!(session.user_id, created.user_id);
1543        assert_eq!(session.title, created.title);
1544    }
1545
1546    #[test]
1547    fn test_get_nonexistent_session() {
1548        let db = Database::new(":memory:").unwrap();
1549
1550        let result = db.get_session("nonexistent-id").unwrap();
1551        assert!(result.is_none());
1552    }
1553
1554    #[test]
1555    fn test_list_sessions() {
1556        let db = Database::new(":memory:").unwrap();
1557
1558        // Create multiple sessions
1559        db.create_session(Some("user1"), Some("Session 1")).unwrap();
1560        db.create_session(Some("user1"), Some("Session 2")).unwrap();
1561        db.create_session(Some("user2"), Some("Session 3")).unwrap();
1562
1563        // List all sessions
1564        let all_sessions = db.list_sessions(None, None).unwrap();
1565        assert_eq!(all_sessions.len(), 3);
1566
1567        // List sessions for user1
1568        let user1_sessions = db.list_sessions(Some("user1"), None).unwrap();
1569        assert_eq!(user1_sessions.len(), 2);
1570
1571        // List sessions for user2
1572        let user2_sessions = db.list_sessions(Some("user2"), None).unwrap();
1573        assert_eq!(user2_sessions.len(), 1);
1574
1575        // Test limit
1576        let limited = db.list_sessions(None, Some(2)).unwrap();
1577        assert_eq!(limited.len(), 2);
1578    }
1579
1580    #[test]
1581    fn test_update_session() {
1582        let db = Database::new(":memory:").unwrap();
1583
1584        let session = db.create_session(Some("user1"), Some("Original Title")).unwrap();
1585
1586        // Update title
1587        db.update_session(&session.id, Some("Updated Title"), None).unwrap();
1588
1589        let updated = db.get_session(&session.id).unwrap().unwrap();
1590        assert_eq!(updated.title, Some("Updated Title".to_string()));
1591
1592        // Update metadata
1593        let metadata = serde_json::json!({"key": "value"});
1594        db.update_session(&session.id, None, Some(&metadata)).unwrap();
1595
1596        let updated2 = db.get_session(&session.id).unwrap().unwrap();
1597        assert!(updated2.metadata.is_some());
1598        assert_eq!(updated2.metadata.unwrap()["key"], "value");
1599    }
1600
1601    #[test]
1602    fn test_delete_session() {
1603        let db = Database::new(":memory:").unwrap();
1604
1605        let session = db.create_session(Some("user1"), Some("To Delete")).unwrap();
1606
1607        // Verify session exists
1608        assert!(db.get_session(&session.id).unwrap().is_some());
1609
1610        // Delete session
1611        db.delete_session(&session.id).unwrap();
1612
1613        // Verify session is gone
1614        assert!(db.get_session(&session.id).unwrap().is_none());
1615    }
1616
1617    #[test]
1618    fn test_add_message() {
1619        let db = Database::new(":memory:").unwrap();
1620
1621        let session = db.create_session(Some("user1"), Some("Chat Session")).unwrap();
1622
1623        // Add first message
1624        let msg1 = db.add_message(&session.id, MessageRole::User, "Hello", None).unwrap();
1625        assert_eq!(msg1.sequence_number, 0);
1626        assert_eq!(msg1.content, "Hello");
1627        assert_eq!(msg1.role.as_str(), "user");
1628
1629        // Add second message
1630        let msg2 = db.add_message(&session.id, MessageRole::Assistant, "Hi there!", None).unwrap();
1631        assert_eq!(msg2.sequence_number, 1);
1632        assert_eq!(msg2.content, "Hi there!");
1633        assert_eq!(msg2.role.as_str(), "assistant");
1634
1635        // Verify session was updated
1636        let updated_session = db.get_session(&session.id).unwrap().unwrap();
1637        assert!(updated_session.last_message_at.is_some());
1638    }
1639
1640    #[test]
1641    fn test_add_message_with_metadata() {
1642        let db = Database::new(":memory:").unwrap();
1643
1644        let session = db.create_session(Some("user1"), Some("Chat Session")).unwrap();
1645
1646        let metadata = serde_json::json!({"tool": "search", "query": "test"});
1647        let msg = db.add_message(&session.id, MessageRole::Tool, "Result", Some(&metadata)).unwrap();
1648
1649        assert!(msg.metadata.is_some());
1650        assert_eq!(msg.metadata.unwrap()["tool"], "search");
1651    }
1652
1653    #[test]
1654    fn test_get_messages() {
1655        let db = Database::new(":memory:").unwrap();
1656
1657        let session = db.create_session(Some("user1"), Some("Chat Session")).unwrap();
1658
1659        // Add multiple messages
1660        db.add_message(&session.id, MessageRole::User, "Message 1", None).unwrap();
1661        db.add_message(&session.id, MessageRole::Assistant, "Message 2", None).unwrap();
1662        db.add_message(&session.id, MessageRole::User, "Message 3", None).unwrap();
1663
1664        // Get all messages
1665        let messages = db.get_messages(&session.id, None).unwrap();
1666        assert_eq!(messages.len(), 3);
1667        assert_eq!(messages[0].sequence_number, 0);
1668        assert_eq!(messages[1].sequence_number, 1);
1669        assert_eq!(messages[2].sequence_number, 2);
1670
1671        // Test limit
1672        let limited = db.get_messages(&session.id, Some(2)).unwrap();
1673        assert_eq!(limited.len(), 2);
1674    }
1675
1676    #[test]
1677    fn test_message_ordering() {
1678        let db = Database::new(":memory:").unwrap();
1679
1680        let session = db.create_session(Some("user1"), Some("Chat Session")).unwrap();
1681
1682        // Add messages
1683        db.add_message(&session.id, MessageRole::User, "First", None).unwrap();
1684        db.add_message(&session.id, MessageRole::Assistant, "Second", None).unwrap();
1685        db.add_message(&session.id, MessageRole::User, "Third", None).unwrap();
1686
1687        let messages = db.get_messages(&session.id, None).unwrap();
1688
1689        // Verify chronological order
1690        assert_eq!(messages[0].content, "First");
1691        assert_eq!(messages[1].content, "Second");
1692        assert_eq!(messages[2].content, "Third");
1693
1694        // Verify sequence numbers are consecutive
1695        for (i, msg) in messages.iter().enumerate() {
1696            assert_eq!(msg.sequence_number, i);
1697        }
1698    }
1699
1700    #[test]
1701    fn test_associate_working_set() {
1702        let db = Database::new(":memory:").unwrap();
1703
1704        let session = db.create_session(Some("user1"), Some("Chat Session")).unwrap();
1705        let message = db.add_message(&session.id, MessageRole::User, "Query", None).unwrap();
1706
1707        // Create a working set
1708        let working_set = WorkingSet {
1709            text: "Test context".to_string(),
1710            spans: Vec::new(),
1711            citations: Vec::new(),
1712            tokens_used: 100,
1713            query: "test query".to_string(),
1714            compilation_time_ms: 50,
1715            manifest: None,
1716            explain: None,
1717        };
1718
1719        let config = CompilerConfig::default();
1720
1721        let sws = db.associate_working_set(
1722            &session.id,
1723            Some(&message.id),
1724            &working_set,
1725            "test query",
1726            &config,
1727        ).unwrap();
1728
1729        assert_eq!(sws.session_id, session.id);
1730        assert_eq!(sws.message_id, Some(message.id));
1731        assert_eq!(sws.query, "test query");
1732        assert_eq!(sws.working_set.text, "Test context");
1733    }
1734
1735    #[test]
1736    fn test_get_session_full() {
1737        let db = Database::new(":memory:").unwrap();
1738
1739        let session = db.create_session(Some("user1"), Some("Full Session")).unwrap();
1740
1741        // Add messages
1742        let msg1 = db.add_message(&session.id, MessageRole::User, "Hello", None).unwrap();
1743        db.add_message(&session.id, MessageRole::Assistant, "Hi!", None).unwrap();
1744
1745        // Add working set
1746        let working_set = WorkingSet {
1747            text: "Context".to_string(),
1748            spans: Vec::new(),
1749            citations: Vec::new(),
1750            tokens_used: 50,
1751            query: "test".to_string(),
1752            compilation_time_ms: 25,
1753            manifest: None,
1754            explain: None,
1755        };
1756
1757        db.associate_working_set(
1758            &session.id,
1759            Some(&msg1.id),
1760            &working_set,
1761            "test",
1762            &CompilerConfig::default(),
1763        ).unwrap();
1764
1765        // Get full session
1766        let full = db.get_session_full(&session.id).unwrap();
1767        assert!(full.is_some());
1768
1769        let swm = full.unwrap();
1770        assert_eq!(swm.session.id, session.id);
1771        assert_eq!(swm.messages.len(), 2);
1772        assert_eq!(swm.working_sets.len(), 1);
1773    }
1774
1775    #[test]
1776    fn test_delete_session_cascade() {
1777        let db = Database::new(":memory:").unwrap();
1778
1779        let session = db.create_session(Some("user1"), Some("To Delete")).unwrap();
1780
1781        // Add messages
1782        db.add_message(&session.id, MessageRole::User, "Message 1", None).unwrap();
1783        db.add_message(&session.id, MessageRole::Assistant, "Message 2", None).unwrap();
1784
1785        // Verify messages exist
1786        let messages_before = db.get_messages(&session.id, None).unwrap();
1787        assert_eq!(messages_before.len(), 2);
1788
1789        // Delete session
1790        db.delete_session(&session.id).unwrap();
1791
1792        // Verify messages are gone (cascade delete)
1793        let messages_after = db.get_messages(&session.id, None).unwrap();
1794        assert_eq!(messages_after.len(), 0);
1795    }
1796
1797    #[test]
1798    fn test_message_role_conversion() {
1799        assert_eq!(MessageRole::User.as_str(), "user");
1800        assert_eq!(MessageRole::Assistant.as_str(), "assistant");
1801        assert_eq!(MessageRole::System.as_str(), "system");
1802        assert_eq!(MessageRole::Tool.as_str(), "tool");
1803
1804        assert!(matches!(MessageRole::from_str("user").unwrap(), MessageRole::User));
1805        assert!(matches!(MessageRole::from_str("assistant").unwrap(), MessageRole::Assistant));
1806        assert!(matches!(MessageRole::from_str("system").unwrap(), MessageRole::System));
1807        assert!(matches!(MessageRole::from_str("tool").unwrap(), MessageRole::Tool));
1808
1809        assert!(MessageRole::from_str("invalid").is_err());
1810    }
1811}