Skip to main content

jacs_postgresql/
lib.rs

1//! PostgreSQL storage backend for JACS documents.
2//!
3//! This crate provides `PostgresStorage`, a PostgreSQL-backed implementation of
4//! JACS storage traits:
5//! - [`StorageDocumentTraits`] -- basic document CRUD
6//! - [`DatabaseDocumentTraits`] -- database-specific query capabilities
7//! - [`SearchProvider`] -- fulltext search via PostgreSQL tsvector
8//!
9//! # Dual-Column Strategy
10//!
11//! Uses TEXT + JSONB dual-column storage:
12//! - `raw_contents` (TEXT): Preserves exact JSON bytes for signature verification
13//! - `file_contents` (JSONB): Enables efficient queries and indexing
14//!
15//! # Append-Only Model with Soft Delete
16//!
17//! Documents are immutable once stored. New versions create new rows
18//! keyed by `(jacs_id, jacs_version)`. The only UPDATE operation is
19//! soft-delete via `remove_document`, which sets `tombstoned = true`
20//! rather than physically deleting rows.
21//!
22//! # Usage
23//!
24//! ```rust,ignore
25//! use jacs_postgresql::PostgresStorage;
26//! use jacs::storage::StorageDocumentTraits;
27//! use jacs::storage::DatabaseDocumentTraits;
28//!
29//! let storage = PostgresStorage::new(&database_url, None, None, None)?;
30//! storage.run_migrations()?;
31//! ```
32
33use jacs::agent::document::JACSDocument;
34use jacs::error::JacsError;
35use jacs::search::{
36    FieldFilter, SearchCapabilities, SearchHit, SearchMethod, SearchProvider, SearchQuery,
37    SearchResults,
38};
39use jacs::storage::StorageDocumentTraits;
40use jacs::storage::database_traits::DatabaseDocumentTraits;
41use serde_json::Value;
42use sqlx::Row;
43use sqlx::postgres::{PgPool, PgPoolOptions, PgRow};
44use std::error::Error;
45use std::time::Duration;
46use tokio::runtime::Handle;
47
48/// PostgreSQL storage backend for JACS documents.
49///
50/// Implements [`StorageDocumentTraits`], [`DatabaseDocumentTraits`], and
51/// [`SearchProvider`]. Supports fulltext search via PostgreSQL tsvector.
52/// Vector search (pgvector) is not yet implemented but the capability
53/// reporting is prepared for it.
54pub struct PostgresStorage {
55    pool: PgPool,
56    handle: Handle,
57}
58
59impl PostgresStorage {
60    /// Create a new PostgresStorage connected to the given PostgreSQL URL.
61    ///
62    /// Pool settings:
63    /// - `max_connections`: Maximum pool size (default 10)
64    /// - `min_connections`: Minimum pool size (default 1)
65    /// - `connect_timeout_secs`: Connection timeout (default 30)
66    pub fn new(
67        database_url: &str,
68        max_connections: Option<u32>,
69        min_connections: Option<u32>,
70        connect_timeout_secs: Option<u64>,
71    ) -> Result<Self, JacsError> {
72        let handle = Handle::try_current().map_err(|e| JacsError::DatabaseError {
73            operation: "init".to_string(),
74            reason: format!(
75                "No tokio runtime available. Database storage requires a tokio runtime: {}",
76                e
77            ),
78        })?;
79
80        let pool = tokio::task::block_in_place(|| {
81            handle.block_on(async {
82                PgPoolOptions::new()
83                    .max_connections(max_connections.unwrap_or(10))
84                    .min_connections(min_connections.unwrap_or(1))
85                    .acquire_timeout(Duration::from_secs(connect_timeout_secs.unwrap_or(30)))
86                    .connect(database_url)
87                    .await
88            })
89        })
90        .map_err(|e| JacsError::DatabaseError {
91            operation: "connect".to_string(),
92            reason: e.to_string(),
93        })?;
94
95        Ok(Self { pool, handle })
96    }
97
98    /// Create a PostgresStorage from an existing pool and handle (for testing).
99    pub fn with_pool(pool: PgPool, handle: Handle) -> Self {
100        Self { pool, handle }
101    }
102
103    /// Get a reference to the underlying pool.
104    pub fn pool(&self) -> &PgPool {
105        &self.pool
106    }
107
108    /// Helper to run async sqlx operations synchronously.
109    ///
110    /// Uses `block_in_place` so this is safe to call from within a tokio
111    /// multi-threaded runtime (e.g. from `#[tokio::test]`).
112    fn block_on<F: std::future::Future>(&self, f: F) -> F::Output {
113        tokio::task::block_in_place(|| self.handle.block_on(f))
114    }
115
116    /// Parse a document key in format "id:version" into (id, version).
117    fn parse_key(key: &str) -> Result<(&str, &str), Box<dyn Error>> {
118        let parts: Vec<&str> = key.splitn(2, ':').collect();
119        if parts.len() != 2 {
120            return Err(format!("Invalid document key '{}': expected 'id:version'", key).into());
121        }
122        Ok((parts[0], parts[1]))
123    }
124
125    /// Build a JACSDocument from a database row.
126    /// Uses raw_contents (TEXT) to preserve exact signed JSON bytes.
127    fn row_to_document(row: &PgRow) -> Result<JACSDocument, JacsError> {
128        let raw: String = row
129            .try_get("raw_contents")
130            .map_err(|e| JacsError::DatabaseError {
131                operation: "row_to_document".into(),
132                reason: e.to_string(),
133            })?;
134        let value: Value = serde_json::from_str(&raw)?;
135
136        let id: String = row
137            .try_get("jacs_id")
138            .map_err(|e| JacsError::DatabaseError {
139                operation: "row_to_document".into(),
140                reason: e.to_string(),
141            })?;
142        let version: String =
143            row.try_get("jacs_version")
144                .map_err(|e| JacsError::DatabaseError {
145                    operation: "row_to_document".into(),
146                    reason: e.to_string(),
147                })?;
148        let jacs_type: String = row
149            .try_get("jacs_type")
150            .map_err(|e| JacsError::DatabaseError {
151                operation: "row_to_document".into(),
152                reason: e.to_string(),
153            })?;
154
155        Ok(JACSDocument {
156            id,
157            version,
158            value,
159            jacs_type,
160        })
161    }
162
163    /// SQL for the jacs_document table creation.
164    const CREATE_TABLE_SQL: &str = r#"
165        CREATE TABLE IF NOT EXISTS jacs_document (
166            jacs_id TEXT NOT NULL,
167            jacs_version TEXT NOT NULL,
168            agent_id TEXT,
169            jacs_type TEXT NOT NULL,
170            raw_contents TEXT NOT NULL,
171            file_contents JSONB NOT NULL,
172            created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
173            tombstoned BOOLEAN NOT NULL DEFAULT false,
174            PRIMARY KEY (jacs_id, jacs_version)
175        )
176    "#;
177
178    /// SQL for basic indexes.
179    const CREATE_INDEXES_SQL: &[&str] = &[
180        "CREATE INDEX IF NOT EXISTS idx_jacs_document_type ON jacs_document (jacs_type)",
181        "CREATE INDEX IF NOT EXISTS idx_jacs_document_agent ON jacs_document (agent_id)",
182        "CREATE INDEX IF NOT EXISTS idx_jacs_document_created ON jacs_document (created_at DESC)",
183    ];
184
185    /// SQL for fulltext search index (tsvector).
186    const CREATE_FTS_INDEX_SQL: &str = r#"
187        CREATE INDEX IF NOT EXISTS idx_jacs_document_fts
188        ON jacs_document
189        USING GIN (to_tsvector('english', raw_contents))
190    "#;
191}
192
193impl StorageDocumentTraits for PostgresStorage {
194    fn store_document(&self, doc: &JACSDocument) -> Result<(), JacsError> {
195        let raw_json = serde_json::to_string_pretty(&doc.value)?;
196        let jsonb_value = &doc.value;
197        let agent_id = doc
198            .value
199            .get("jacsSignature")
200            .and_then(|s| s.get("jacsSignatureAgentId"))
201            .and_then(|v| v.as_str())
202            .map(|s| s.to_string());
203
204        self.block_on(async {
205            sqlx::query(
206                r#"INSERT INTO jacs_document (jacs_id, jacs_version, agent_id, jacs_type, raw_contents, file_contents)
207                   VALUES ($1, $2, $3, $4, $5, $6)
208                   ON CONFLICT (jacs_id, jacs_version) DO NOTHING"#,
209            )
210            .bind(&doc.id)
211            .bind(&doc.version)
212            .bind(&agent_id)
213            .bind(&doc.jacs_type)
214            .bind(&raw_json)
215            .bind(jsonb_value)
216            .execute(&self.pool)
217            .await
218        })
219        .map_err(|e| {
220            JacsError::DatabaseError {
221                operation: "store_document".to_string(),
222                reason: e.to_string(),
223            }
224        })?;
225
226        Ok(())
227    }
228
229    fn get_document(&self, key: &str) -> Result<JACSDocument, JacsError> {
230        let (id, version) = Self::parse_key(key)?;
231
232        let row = self.block_on(async {
233            sqlx::query(
234                "SELECT jacs_id, jacs_version, agent_id, jacs_type, raw_contents, file_contents \
235                 FROM jacs_document WHERE jacs_id = $1 AND jacs_version = $2 AND tombstoned = false",
236            )
237            .bind(id)
238            .bind(version)
239            .fetch_one(&self.pool)
240            .await
241        })
242        .map_err(|e| {
243            JacsError::DatabaseError {
244                operation: "get_document".to_string(),
245                reason: e.to_string(),
246            }
247        })?;
248
249        Self::row_to_document(&row)
250    }
251
252    fn remove_document(&self, key: &str) -> Result<JACSDocument, JacsError> {
253        let doc = self.get_document(key)?;
254        let (id, version) = Self::parse_key(key)?;
255
256        self.block_on(async {
257            sqlx::query("UPDATE jacs_document SET tombstoned = true WHERE jacs_id = $1 AND jacs_version = $2")
258                .bind(id)
259                .bind(version)
260                .execute(&self.pool)
261                .await
262        })
263        .map_err(|e| {
264            JacsError::DatabaseError {
265                operation: "remove_document".to_string(),
266                reason: e.to_string(),
267            }
268        })?;
269
270        Ok(doc)
271    }
272
273    fn list_documents(&self, prefix: &str) -> Result<Vec<String>, JacsError> {
274        let rows = self
275            .block_on(async {
276                sqlx::query(
277                    "SELECT jacs_id, jacs_version FROM jacs_document \
278                 WHERE jacs_type = $1 AND tombstoned = false ORDER BY created_at DESC",
279                )
280                .bind(prefix)
281                .fetch_all(&self.pool)
282                .await
283            })
284            .map_err(|e| JacsError::DatabaseError {
285                operation: "list_documents".to_string(),
286                reason: e.to_string(),
287            })?;
288
289        Ok(rows
290            .iter()
291            .map(|row| {
292                let id: String = row.get("jacs_id");
293                let version: String = row.get("jacs_version");
294                format!("{}:{}", id, version)
295            })
296            .collect())
297    }
298
299    fn document_exists(&self, key: &str) -> Result<bool, JacsError> {
300        let (id, version) = Self::parse_key(key)?;
301
302        let exists: bool = self
303            .block_on(async {
304                sqlx::query_scalar::<_, bool>(
305                    "SELECT EXISTS(SELECT 1 FROM jacs_document \
306                     WHERE jacs_id = $1 AND jacs_version = $2 AND tombstoned = false)",
307                )
308                .bind(id)
309                .bind(version)
310                .fetch_one(&self.pool)
311                .await
312            })
313            .map_err(|e| JacsError::DatabaseError {
314                operation: "document_exists".to_string(),
315                reason: e.to_string(),
316            })?;
317
318        Ok(exists)
319    }
320
321    fn get_documents_by_agent(&self, agent_id: &str) -> Result<Vec<String>, JacsError> {
322        let rows = self
323            .block_on(async {
324                sqlx::query(
325                    "SELECT jacs_id, jacs_version FROM jacs_document \
326                 WHERE agent_id = $1 AND tombstoned = false ORDER BY created_at DESC",
327                )
328                .bind(agent_id)
329                .fetch_all(&self.pool)
330                .await
331            })
332            .map_err(|e| JacsError::DatabaseError {
333                operation: "get_documents_by_agent".to_string(),
334                reason: e.to_string(),
335            })?;
336
337        Ok(rows
338            .iter()
339            .map(|row| {
340                let id: String = row.get("jacs_id");
341                let version: String = row.get("jacs_version");
342                format!("{}:{}", id, version)
343            })
344            .collect())
345    }
346
347    fn get_document_versions(&self, document_id: &str) -> Result<Vec<String>, JacsError> {
348        let rows = self
349            .block_on(async {
350                sqlx::query(
351                    "SELECT jacs_id, jacs_version FROM jacs_document \
352                 WHERE jacs_id = $1 AND tombstoned = false ORDER BY created_at ASC",
353                )
354                .bind(document_id)
355                .fetch_all(&self.pool)
356                .await
357            })
358            .map_err(|e| JacsError::DatabaseError {
359                operation: "get_document_versions".to_string(),
360                reason: e.to_string(),
361            })?;
362
363        Ok(rows
364            .iter()
365            .map(|row| {
366                let id: String = row.get("jacs_id");
367                let version: String = row.get("jacs_version");
368                format!("{}:{}", id, version)
369            })
370            .collect())
371    }
372
373    fn get_latest_document(&self, document_id: &str) -> Result<JACSDocument, JacsError> {
374        let row = self.block_on(async {
375            sqlx::query(
376                "SELECT jacs_id, jacs_version, agent_id, jacs_type, raw_contents, file_contents \
377                 FROM jacs_document WHERE jacs_id = $1 AND tombstoned = false ORDER BY created_at DESC LIMIT 1",
378            )
379            .bind(document_id)
380            .fetch_one(&self.pool)
381            .await
382        })
383        .map_err(|e| {
384            JacsError::DatabaseError {
385                operation: "get_latest_document".to_string(),
386                reason: e.to_string(),
387            }
388        })?;
389
390        Self::row_to_document(&row)
391    }
392
393    fn merge_documents(
394        &self,
395        _doc_id: &str,
396        _v1: &str,
397        _v2: &str,
398    ) -> Result<JACSDocument, JacsError> {
399        Err(JacsError::DatabaseError {
400            operation: "merge_documents".to_string(),
401            reason: "Not implemented for database backend".to_string(),
402        })
403    }
404
405    fn store_documents(&self, docs: Vec<JACSDocument>) -> Result<Vec<String>, Vec<JacsError>> {
406        let mut errors = Vec::new();
407        let mut keys = Vec::new();
408        for doc in &docs {
409            match self.store_document(doc) {
410                Ok(_) => keys.push(doc.getkey()),
411                Err(e) => errors.push(e),
412            }
413        }
414        if errors.is_empty() {
415            Ok(keys)
416        } else {
417            Err(errors)
418        }
419    }
420
421    fn get_documents(&self, keys: Vec<String>) -> Result<Vec<JACSDocument>, Vec<JacsError>> {
422        let mut docs = Vec::new();
423        let mut errors = Vec::new();
424        for key in &keys {
425            match self.get_document(key) {
426                Ok(doc) => docs.push(doc),
427                Err(e) => errors.push(e),
428            }
429        }
430        if errors.is_empty() {
431            Ok(docs)
432        } else {
433            Err(errors)
434        }
435    }
436}
437
438impl DatabaseDocumentTraits for PostgresStorage {
439    fn query_by_type(
440        &self,
441        jacs_type: &str,
442        limit: usize,
443        offset: usize,
444    ) -> Result<Vec<JACSDocument>, JacsError> {
445        let rows = self
446            .block_on(async {
447                sqlx::query(
448                "SELECT jacs_id, jacs_version, agent_id, jacs_type, raw_contents, file_contents \
449                 FROM jacs_document WHERE jacs_type = $1 AND tombstoned = false \
450                 ORDER BY created_at DESC LIMIT $2 OFFSET $3",
451            )
452            .bind(jacs_type)
453            .bind(limit as i64)
454            .bind(offset as i64)
455            .fetch_all(&self.pool)
456            .await
457            })
458            .map_err(|e| JacsError::DatabaseError {
459                operation: "query_by_type".to_string(),
460                reason: e.to_string(),
461            })?;
462
463        rows.iter().map(Self::row_to_document).collect()
464    }
465
466    fn query_by_field(
467        &self,
468        field_path: &str,
469        value: &str,
470        jacs_type: Option<&str>,
471        limit: usize,
472        offset: usize,
473    ) -> Result<Vec<JACSDocument>, JacsError> {
474        let rows = if let Some(doc_type) = jacs_type {
475            self.block_on(async {
476                sqlx::query(
477                    "SELECT jacs_id, jacs_version, agent_id, jacs_type, raw_contents, file_contents \
478                     FROM jacs_document WHERE file_contents->>$1 = $2 AND jacs_type = $3 AND tombstoned = false \
479                     ORDER BY created_at DESC LIMIT $4 OFFSET $5",
480                )
481                .bind(field_path)
482                .bind(value)
483                .bind(doc_type)
484                .bind(limit as i64)
485                .bind(offset as i64)
486                .fetch_all(&self.pool)
487                .await
488            })
489        } else {
490            self.block_on(async {
491                sqlx::query(
492                    "SELECT jacs_id, jacs_version, agent_id, jacs_type, raw_contents, file_contents \
493                     FROM jacs_document WHERE file_contents->>$1 = $2 AND tombstoned = false \
494                     ORDER BY created_at DESC LIMIT $3 OFFSET $4",
495                )
496                .bind(field_path)
497                .bind(value)
498                .bind(limit as i64)
499                .bind(offset as i64)
500                .fetch_all(&self.pool)
501                .await
502            })
503        }
504        .map_err(|e| {
505            JacsError::DatabaseError {
506                operation: "query_by_field".to_string(),
507                reason: e.to_string(),
508            }
509        })?;
510
511        rows.iter().map(Self::row_to_document).collect()
512    }
513
514    fn count_by_type(&self, jacs_type: &str) -> Result<usize, JacsError> {
515        let count: i64 = self
516            .block_on(async {
517                sqlx::query_scalar::<_, i64>(
518                    "SELECT COUNT(*) FROM jacs_document WHERE jacs_type = $1 AND tombstoned = false",
519                )
520                .bind(jacs_type)
521                .fetch_one(&self.pool)
522                .await
523            })
524            .map_err(|e| {
525                JacsError::DatabaseError {
526                    operation: "count_by_type".to_string(),
527                    reason: e.to_string(),
528                }
529            })?;
530
531        Ok(count as usize)
532    }
533
534    fn get_versions(&self, jacs_id: &str) -> Result<Vec<JACSDocument>, JacsError> {
535        let rows = self.block_on(async {
536            sqlx::query(
537                "SELECT jacs_id, jacs_version, agent_id, jacs_type, raw_contents, file_contents \
538                 FROM jacs_document WHERE jacs_id = $1 AND tombstoned = false ORDER BY created_at ASC",
539            )
540            .bind(jacs_id)
541            .fetch_all(&self.pool)
542            .await
543        })
544        .map_err(|e| {
545            JacsError::DatabaseError {
546                operation: "get_versions".to_string(),
547                reason: e.to_string(),
548            }
549        })?;
550
551        rows.iter().map(Self::row_to_document).collect()
552    }
553
554    fn get_latest(&self, jacs_id: &str) -> Result<JACSDocument, JacsError> {
555        self.get_latest_document(jacs_id)
556    }
557
558    fn query_by_agent(
559        &self,
560        agent_id: &str,
561        jacs_type: Option<&str>,
562        limit: usize,
563        offset: usize,
564    ) -> Result<Vec<JACSDocument>, JacsError> {
565        let rows = if let Some(doc_type) = jacs_type {
566            self.block_on(async {
567                sqlx::query(
568                    "SELECT jacs_id, jacs_version, agent_id, jacs_type, raw_contents, file_contents \
569                     FROM jacs_document WHERE agent_id = $1 AND jacs_type = $2 AND tombstoned = false \
570                     ORDER BY created_at DESC LIMIT $3 OFFSET $4",
571                )
572                .bind(agent_id)
573                .bind(doc_type)
574                .bind(limit as i64)
575                .bind(offset as i64)
576                .fetch_all(&self.pool)
577                .await
578            })
579        } else {
580            self.block_on(async {
581                sqlx::query(
582                    "SELECT jacs_id, jacs_version, agent_id, jacs_type, raw_contents, file_contents \
583                     FROM jacs_document WHERE agent_id = $1 AND tombstoned = false \
584                     ORDER BY created_at DESC LIMIT $2 OFFSET $3",
585                )
586                .bind(agent_id)
587                .bind(limit as i64)
588                .bind(offset as i64)
589                .fetch_all(&self.pool)
590                .await
591            })
592        }
593        .map_err(|e| {
594            JacsError::DatabaseError {
595                operation: "query_by_agent".to_string(),
596                reason: e.to_string(),
597            }
598        })?;
599
600        rows.iter().map(Self::row_to_document).collect()
601    }
602
603    fn run_migrations(&self) -> Result<(), JacsError> {
604        self.block_on(async {
605            sqlx::query(Self::CREATE_TABLE_SQL)
606                .execute(&self.pool)
607                .await
608        })
609        .map_err(|e| JacsError::DatabaseError {
610            operation: "run_migrations".to_string(),
611            reason: e.to_string(),
612        })?;
613
614        for index_sql in Self::CREATE_INDEXES_SQL {
615            self.block_on(async { sqlx::query(index_sql).execute(&self.pool).await })
616                .map_err(|e| JacsError::DatabaseError {
617                    operation: "run_migrations".to_string(),
618                    reason: format!("Failed to create index: {}", e),
619                })?;
620        }
621
622        // Create fulltext search index for tsvector-based search.
623        self.block_on(async {
624            sqlx::query(Self::CREATE_FTS_INDEX_SQL)
625                .execute(&self.pool)
626                .await
627        })
628        .map_err(|e| JacsError::DatabaseError {
629            operation: "run_migrations".to_string(),
630            reason: format!("Failed to create FTS index: {}", e),
631        })?;
632
633        // Tombstone migration: add tombstoned column for soft-delete support.
634        // Idempotent -- IF NOT EXISTS prevents errors on re-run.
635        let _ = self.block_on(async {
636            sqlx::query("ALTER TABLE jacs_document ADD COLUMN IF NOT EXISTS tombstoned BOOLEAN NOT NULL DEFAULT false")
637                .execute(&self.pool)
638                .await
639        });
640
641        Ok(())
642    }
643}
644
645impl SearchProvider for PostgresStorage {
646    fn search(&self, query: SearchQuery) -> Result<SearchResults, JacsError> {
647        // Handle field_filter queries via query_by_field (exact field match, no FTS)
648        if let Some(FieldFilter {
649            ref field_path,
650            ref value,
651        }) = query.field_filter
652        {
653            let docs = self
654                .query_by_field(
655                    field_path,
656                    value,
657                    query.jacs_type.as_deref(),
658                    query.limit,
659                    query.offset,
660                )
661                .map_err(|e| {
662                    JacsError::StorageError(format!("field_filter search failed: {}", e))
663                })?;
664
665            let total_count = docs.len();
666            let results = docs
667                .into_iter()
668                .map(|doc| SearchHit {
669                    document: doc,
670                    score: 1.0,
671                    matched_fields: vec![field_path.clone()],
672                })
673                .collect();
674
675            return Ok(SearchResults {
676                results,
677                total_count,
678                method: SearchMethod::FieldMatch,
679            });
680        }
681
682        if query.query.is_empty() {
683            return Ok(SearchResults {
684                results: vec![],
685                total_count: 0,
686                method: SearchMethod::FullText,
687            });
688        }
689
690        // Build fulltext search dynamically to support optional jacs_type and agent_id filters.
691        // PostgreSQL tsvector fulltext search with parameterized queries.
692        let has_type = query.jacs_type.is_some();
693        let has_agent = query.agent_id.is_some();
694
695        // Build SQL with correct positional parameter indices ($1 = query text)
696        let (count_sql, results_sql) = match (has_type, has_agent) {
697            (true, true) => (
698                "SELECT COUNT(*) FROM jacs_document \
699                 WHERE to_tsvector('english', raw_contents) @@ plainto_tsquery('english', $1) \
700                 AND jacs_type = $2 AND agent_id = $3 AND tombstoned = false"
701                    .to_string(),
702                "SELECT jacs_id, jacs_version, agent_id, jacs_type, raw_contents, file_contents, \
703                 ts_rank(to_tsvector('english', raw_contents), plainto_tsquery('english', $1)) AS rank \
704                 FROM jacs_document \
705                 WHERE to_tsvector('english', raw_contents) @@ plainto_tsquery('english', $1) \
706                 AND jacs_type = $2 AND agent_id = $3 AND tombstoned = false \
707                 ORDER BY rank DESC LIMIT $4 OFFSET $5"
708                    .to_string(),
709            ),
710            (true, false) => (
711                "SELECT COUNT(*) FROM jacs_document \
712                 WHERE to_tsvector('english', raw_contents) @@ plainto_tsquery('english', $1) \
713                 AND jacs_type = $2 AND tombstoned = false"
714                    .to_string(),
715                "SELECT jacs_id, jacs_version, agent_id, jacs_type, raw_contents, file_contents, \
716                 ts_rank(to_tsvector('english', raw_contents), plainto_tsquery('english', $1)) AS rank \
717                 FROM jacs_document \
718                 WHERE to_tsvector('english', raw_contents) @@ plainto_tsquery('english', $1) \
719                 AND jacs_type = $2 AND tombstoned = false \
720                 ORDER BY rank DESC LIMIT $3 OFFSET $4"
721                    .to_string(),
722            ),
723            (false, true) => (
724                "SELECT COUNT(*) FROM jacs_document \
725                 WHERE to_tsvector('english', raw_contents) @@ plainto_tsquery('english', $1) \
726                 AND agent_id = $2 AND tombstoned = false"
727                    .to_string(),
728                "SELECT jacs_id, jacs_version, agent_id, jacs_type, raw_contents, file_contents, \
729                 ts_rank(to_tsvector('english', raw_contents), plainto_tsquery('english', $1)) AS rank \
730                 FROM jacs_document \
731                 WHERE to_tsvector('english', raw_contents) @@ plainto_tsquery('english', $1) \
732                 AND agent_id = $2 AND tombstoned = false \
733                 ORDER BY rank DESC LIMIT $3 OFFSET $4"
734                    .to_string(),
735            ),
736            (false, false) => (
737                "SELECT COUNT(*) FROM jacs_document \
738                 WHERE to_tsvector('english', raw_contents) @@ plainto_tsquery('english', $1) \
739                 AND tombstoned = false"
740                    .to_string(),
741                "SELECT jacs_id, jacs_version, agent_id, jacs_type, raw_contents, file_contents, \
742                 ts_rank(to_tsvector('english', raw_contents), plainto_tsquery('english', $1)) AS rank \
743                 FROM jacs_document \
744                 WHERE to_tsvector('english', raw_contents) @@ plainto_tsquery('english', $1) \
745                 AND tombstoned = false \
746                 ORDER BY rank DESC LIMIT $2 OFFSET $3"
747                    .to_string(),
748            ),
749        };
750
751        // Execute count query
752        let mut count_q = sqlx::query_scalar::<_, i64>(&count_sql).bind(&query.query);
753        if let Some(ref jt) = query.jacs_type {
754            count_q = count_q.bind(jt);
755        }
756        if let Some(ref ai) = query.agent_id {
757            count_q = count_q.bind(ai);
758        }
759        let total_count: i64 = self
760            .block_on(async { count_q.fetch_one(&self.pool).await })
761            .map_err(|e| JacsError::StorageError(format!("FTS count query failed: {}", e)))?;
762
763        // Execute results query
764        let mut results_q = sqlx::query(&results_sql).bind(&query.query);
765        if let Some(ref jt) = query.jacs_type {
766            results_q = results_q.bind(jt);
767        }
768        if let Some(ref ai) = query.agent_id {
769            results_q = results_q.bind(ai);
770        }
771        results_q = results_q.bind(query.limit as i64).bind(query.offset as i64);
772
773        let rows = self
774            .block_on(async { results_q.fetch_all(&self.pool).await })
775            .map_err(|e| JacsError::StorageError(format!("FTS search failed: {}", e)))?;
776
777        // Collect ranks for relative normalization
778        let ranks: Vec<f32> = rows
779            .iter()
780            .map(|row| row.try_get::<f32, _>("rank").unwrap_or(0.0))
781            .collect();
782        let max_rank = ranks.iter().cloned().fold(f32::MIN, f32::max);
783
784        let mut results = Vec::new();
785        for (row, &rank) in rows.iter().zip(ranks.iter()) {
786            let doc = Self::row_to_document(row)
787                .map_err(|e| JacsError::StorageError(format!("Failed to parse row: {}", e)))?;
788
789            // Normalize rank relative to max score in result set (preserves ranking fidelity)
790            let score = if max_rank > 0.0 {
791                (rank / max_rank) as f64
792            } else {
793                0.0
794            };
795
796            if let Some(min_score) = query.min_score {
797                if score < min_score {
798                    continue;
799                }
800            }
801
802            results.push(SearchHit {
803                document: doc,
804                score,
805                matched_fields: vec!["raw_contents".to_string()],
806            });
807        }
808
809        Ok(SearchResults {
810            results,
811            total_count: total_count as usize,
812            method: SearchMethod::FullText,
813        })
814    }
815
816    fn capabilities(&self) -> SearchCapabilities {
817        SearchCapabilities {
818            fulltext: true,
819            vector: false,
820            hybrid: false,
821            field_filter: true,
822        }
823    }
824}
825
826#[cfg(test)]
827mod tests {
828    use super::*;
829
830    #[test]
831    fn capabilities_reports_fulltext_true_vector_false() {
832        let caps = SearchCapabilities {
833            fulltext: true,
834            vector: false,
835            hybrid: false,
836            field_filter: true,
837        };
838        assert!(caps.fulltext);
839        assert!(!caps.vector);
840        assert!(!caps.hybrid);
841        assert!(caps.field_filter);
842    }
843
844    #[test]
845    fn parse_key_valid() {
846        let (id, version) = PostgresStorage::parse_key("doc-1:v1").unwrap();
847        assert_eq!(id, "doc-1");
848        assert_eq!(version, "v1");
849    }
850
851    #[test]
852    fn parse_key_invalid() {
853        let result = PostgresStorage::parse_key("invalid-key-no-colon");
854        assert!(result.is_err());
855    }
856
857    #[test]
858    fn parse_key_with_colons_in_version() {
859        let (id, version) = PostgresStorage::parse_key("doc-1:v1:extra").unwrap();
860        assert_eq!(id, "doc-1");
861        assert_eq!(version, "v1:extra");
862    }
863}