1use jacs::agent::document::JACSDocument;
34use jacs::error::JacsError;
35use jacs::search::{
36 FieldFilter, SearchCapabilities, SearchHit, SearchMethod, SearchProvider, SearchQuery,
37 SearchResults,
38};
39use jacs::storage::StorageDocumentTraits;
40use jacs::storage::database_traits::DatabaseDocumentTraits;
41use serde_json::Value;
42use sqlx::Row;
43use sqlx::postgres::{PgPool, PgPoolOptions, PgRow};
44use std::error::Error;
45use std::time::Duration;
46use tokio::runtime::Handle;
47
48pub struct PostgresStorage {
55 pool: PgPool,
56 handle: Handle,
57}
58
59impl PostgresStorage {
60 pub fn new(
67 database_url: &str,
68 max_connections: Option<u32>,
69 min_connections: Option<u32>,
70 connect_timeout_secs: Option<u64>,
71 ) -> Result<Self, JacsError> {
72 let handle = Handle::try_current().map_err(|e| JacsError::DatabaseError {
73 operation: "init".to_string(),
74 reason: format!(
75 "No tokio runtime available. Database storage requires a tokio runtime: {}",
76 e
77 ),
78 })?;
79
80 let pool = tokio::task::block_in_place(|| {
81 handle.block_on(async {
82 PgPoolOptions::new()
83 .max_connections(max_connections.unwrap_or(10))
84 .min_connections(min_connections.unwrap_or(1))
85 .acquire_timeout(Duration::from_secs(connect_timeout_secs.unwrap_or(30)))
86 .connect(database_url)
87 .await
88 })
89 })
90 .map_err(|e| JacsError::DatabaseError {
91 operation: "connect".to_string(),
92 reason: e.to_string(),
93 })?;
94
95 Ok(Self { pool, handle })
96 }
97
98 pub fn with_pool(pool: PgPool, handle: Handle) -> Self {
100 Self { pool, handle }
101 }
102
103 pub fn pool(&self) -> &PgPool {
105 &self.pool
106 }
107
108 fn block_on<F: std::future::Future>(&self, f: F) -> F::Output {
113 tokio::task::block_in_place(|| self.handle.block_on(f))
114 }
115
116 fn parse_key(key: &str) -> Result<(&str, &str), Box<dyn Error>> {
118 let parts: Vec<&str> = key.splitn(2, ':').collect();
119 if parts.len() != 2 {
120 return Err(format!("Invalid document key '{}': expected 'id:version'", key).into());
121 }
122 Ok((parts[0], parts[1]))
123 }
124
125 fn row_to_document(row: &PgRow) -> Result<JACSDocument, JacsError> {
128 let raw: String = row
129 .try_get("raw_contents")
130 .map_err(|e| JacsError::DatabaseError {
131 operation: "row_to_document".into(),
132 reason: e.to_string(),
133 })?;
134 let value: Value = serde_json::from_str(&raw)?;
135
136 let id: String = row
137 .try_get("jacs_id")
138 .map_err(|e| JacsError::DatabaseError {
139 operation: "row_to_document".into(),
140 reason: e.to_string(),
141 })?;
142 let version: String =
143 row.try_get("jacs_version")
144 .map_err(|e| JacsError::DatabaseError {
145 operation: "row_to_document".into(),
146 reason: e.to_string(),
147 })?;
148 let jacs_type: String = row
149 .try_get("jacs_type")
150 .map_err(|e| JacsError::DatabaseError {
151 operation: "row_to_document".into(),
152 reason: e.to_string(),
153 })?;
154
155 Ok(JACSDocument {
156 id,
157 version,
158 value,
159 jacs_type,
160 })
161 }
162
163 const CREATE_TABLE_SQL: &str = r#"
165 CREATE TABLE IF NOT EXISTS jacs_document (
166 jacs_id TEXT NOT NULL,
167 jacs_version TEXT NOT NULL,
168 agent_id TEXT,
169 jacs_type TEXT NOT NULL,
170 raw_contents TEXT NOT NULL,
171 file_contents JSONB NOT NULL,
172 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
173 tombstoned BOOLEAN NOT NULL DEFAULT false,
174 PRIMARY KEY (jacs_id, jacs_version)
175 )
176 "#;
177
178 const CREATE_INDEXES_SQL: &[&str] = &[
180 "CREATE INDEX IF NOT EXISTS idx_jacs_document_type ON jacs_document (jacs_type)",
181 "CREATE INDEX IF NOT EXISTS idx_jacs_document_agent ON jacs_document (agent_id)",
182 "CREATE INDEX IF NOT EXISTS idx_jacs_document_created ON jacs_document (created_at DESC)",
183 ];
184
185 const CREATE_FTS_INDEX_SQL: &str = r#"
187 CREATE INDEX IF NOT EXISTS idx_jacs_document_fts
188 ON jacs_document
189 USING GIN (to_tsvector('english', raw_contents))
190 "#;
191}
192
193impl StorageDocumentTraits for PostgresStorage {
194 fn store_document(&self, doc: &JACSDocument) -> Result<(), JacsError> {
195 let raw_json = serde_json::to_string_pretty(&doc.value)?;
196 let jsonb_value = &doc.value;
197 let agent_id = doc
198 .value
199 .get("jacsSignature")
200 .and_then(|s| s.get("jacsSignatureAgentId"))
201 .and_then(|v| v.as_str())
202 .map(|s| s.to_string());
203
204 self.block_on(async {
205 sqlx::query(
206 r#"INSERT INTO jacs_document (jacs_id, jacs_version, agent_id, jacs_type, raw_contents, file_contents)
207 VALUES ($1, $2, $3, $4, $5, $6)
208 ON CONFLICT (jacs_id, jacs_version) DO NOTHING"#,
209 )
210 .bind(&doc.id)
211 .bind(&doc.version)
212 .bind(&agent_id)
213 .bind(&doc.jacs_type)
214 .bind(&raw_json)
215 .bind(jsonb_value)
216 .execute(&self.pool)
217 .await
218 })
219 .map_err(|e| {
220 JacsError::DatabaseError {
221 operation: "store_document".to_string(),
222 reason: e.to_string(),
223 }
224 })?;
225
226 Ok(())
227 }
228
229 fn get_document(&self, key: &str) -> Result<JACSDocument, JacsError> {
230 let (id, version) = Self::parse_key(key)?;
231
232 let row = self.block_on(async {
233 sqlx::query(
234 "SELECT jacs_id, jacs_version, agent_id, jacs_type, raw_contents, file_contents \
235 FROM jacs_document WHERE jacs_id = $1 AND jacs_version = $2 AND tombstoned = false",
236 )
237 .bind(id)
238 .bind(version)
239 .fetch_one(&self.pool)
240 .await
241 })
242 .map_err(|e| {
243 JacsError::DatabaseError {
244 operation: "get_document".to_string(),
245 reason: e.to_string(),
246 }
247 })?;
248
249 Self::row_to_document(&row)
250 }
251
252 fn remove_document(&self, key: &str) -> Result<JACSDocument, JacsError> {
253 let doc = self.get_document(key)?;
254 let (id, version) = Self::parse_key(key)?;
255
256 self.block_on(async {
257 sqlx::query("UPDATE jacs_document SET tombstoned = true WHERE jacs_id = $1 AND jacs_version = $2")
258 .bind(id)
259 .bind(version)
260 .execute(&self.pool)
261 .await
262 })
263 .map_err(|e| {
264 JacsError::DatabaseError {
265 operation: "remove_document".to_string(),
266 reason: e.to_string(),
267 }
268 })?;
269
270 Ok(doc)
271 }
272
273 fn list_documents(&self, prefix: &str) -> Result<Vec<String>, JacsError> {
274 let rows = self
275 .block_on(async {
276 sqlx::query(
277 "SELECT jacs_id, jacs_version FROM jacs_document \
278 WHERE jacs_type = $1 AND tombstoned = false ORDER BY created_at DESC",
279 )
280 .bind(prefix)
281 .fetch_all(&self.pool)
282 .await
283 })
284 .map_err(|e| JacsError::DatabaseError {
285 operation: "list_documents".to_string(),
286 reason: e.to_string(),
287 })?;
288
289 Ok(rows
290 .iter()
291 .map(|row| {
292 let id: String = row.get("jacs_id");
293 let version: String = row.get("jacs_version");
294 format!("{}:{}", id, version)
295 })
296 .collect())
297 }
298
299 fn document_exists(&self, key: &str) -> Result<bool, JacsError> {
300 let (id, version) = Self::parse_key(key)?;
301
302 let exists: bool = self
303 .block_on(async {
304 sqlx::query_scalar::<_, bool>(
305 "SELECT EXISTS(SELECT 1 FROM jacs_document \
306 WHERE jacs_id = $1 AND jacs_version = $2 AND tombstoned = false)",
307 )
308 .bind(id)
309 .bind(version)
310 .fetch_one(&self.pool)
311 .await
312 })
313 .map_err(|e| JacsError::DatabaseError {
314 operation: "document_exists".to_string(),
315 reason: e.to_string(),
316 })?;
317
318 Ok(exists)
319 }
320
321 fn get_documents_by_agent(&self, agent_id: &str) -> Result<Vec<String>, JacsError> {
322 let rows = self
323 .block_on(async {
324 sqlx::query(
325 "SELECT jacs_id, jacs_version FROM jacs_document \
326 WHERE agent_id = $1 AND tombstoned = false ORDER BY created_at DESC",
327 )
328 .bind(agent_id)
329 .fetch_all(&self.pool)
330 .await
331 })
332 .map_err(|e| JacsError::DatabaseError {
333 operation: "get_documents_by_agent".to_string(),
334 reason: e.to_string(),
335 })?;
336
337 Ok(rows
338 .iter()
339 .map(|row| {
340 let id: String = row.get("jacs_id");
341 let version: String = row.get("jacs_version");
342 format!("{}:{}", id, version)
343 })
344 .collect())
345 }
346
347 fn get_document_versions(&self, document_id: &str) -> Result<Vec<String>, JacsError> {
348 let rows = self
349 .block_on(async {
350 sqlx::query(
351 "SELECT jacs_id, jacs_version FROM jacs_document \
352 WHERE jacs_id = $1 AND tombstoned = false ORDER BY created_at ASC",
353 )
354 .bind(document_id)
355 .fetch_all(&self.pool)
356 .await
357 })
358 .map_err(|e| JacsError::DatabaseError {
359 operation: "get_document_versions".to_string(),
360 reason: e.to_string(),
361 })?;
362
363 Ok(rows
364 .iter()
365 .map(|row| {
366 let id: String = row.get("jacs_id");
367 let version: String = row.get("jacs_version");
368 format!("{}:{}", id, version)
369 })
370 .collect())
371 }
372
373 fn get_latest_document(&self, document_id: &str) -> Result<JACSDocument, JacsError> {
374 let row = self.block_on(async {
375 sqlx::query(
376 "SELECT jacs_id, jacs_version, agent_id, jacs_type, raw_contents, file_contents \
377 FROM jacs_document WHERE jacs_id = $1 AND tombstoned = false ORDER BY created_at DESC LIMIT 1",
378 )
379 .bind(document_id)
380 .fetch_one(&self.pool)
381 .await
382 })
383 .map_err(|e| {
384 JacsError::DatabaseError {
385 operation: "get_latest_document".to_string(),
386 reason: e.to_string(),
387 }
388 })?;
389
390 Self::row_to_document(&row)
391 }
392
393 fn merge_documents(
394 &self,
395 _doc_id: &str,
396 _v1: &str,
397 _v2: &str,
398 ) -> Result<JACSDocument, JacsError> {
399 Err(JacsError::DatabaseError {
400 operation: "merge_documents".to_string(),
401 reason: "Not implemented for database backend".to_string(),
402 })
403 }
404
405 fn store_documents(&self, docs: Vec<JACSDocument>) -> Result<Vec<String>, Vec<JacsError>> {
406 let mut errors = Vec::new();
407 let mut keys = Vec::new();
408 for doc in &docs {
409 match self.store_document(doc) {
410 Ok(_) => keys.push(doc.getkey()),
411 Err(e) => errors.push(e),
412 }
413 }
414 if errors.is_empty() {
415 Ok(keys)
416 } else {
417 Err(errors)
418 }
419 }
420
421 fn get_documents(&self, keys: Vec<String>) -> Result<Vec<JACSDocument>, Vec<JacsError>> {
422 let mut docs = Vec::new();
423 let mut errors = Vec::new();
424 for key in &keys {
425 match self.get_document(key) {
426 Ok(doc) => docs.push(doc),
427 Err(e) => errors.push(e),
428 }
429 }
430 if errors.is_empty() {
431 Ok(docs)
432 } else {
433 Err(errors)
434 }
435 }
436}
437
438impl DatabaseDocumentTraits for PostgresStorage {
439 fn query_by_type(
440 &self,
441 jacs_type: &str,
442 limit: usize,
443 offset: usize,
444 ) -> Result<Vec<JACSDocument>, JacsError> {
445 let rows = self
446 .block_on(async {
447 sqlx::query(
448 "SELECT jacs_id, jacs_version, agent_id, jacs_type, raw_contents, file_contents \
449 FROM jacs_document WHERE jacs_type = $1 AND tombstoned = false \
450 ORDER BY created_at DESC LIMIT $2 OFFSET $3",
451 )
452 .bind(jacs_type)
453 .bind(limit as i64)
454 .bind(offset as i64)
455 .fetch_all(&self.pool)
456 .await
457 })
458 .map_err(|e| JacsError::DatabaseError {
459 operation: "query_by_type".to_string(),
460 reason: e.to_string(),
461 })?;
462
463 rows.iter().map(Self::row_to_document).collect()
464 }
465
466 fn query_by_field(
467 &self,
468 field_path: &str,
469 value: &str,
470 jacs_type: Option<&str>,
471 limit: usize,
472 offset: usize,
473 ) -> Result<Vec<JACSDocument>, JacsError> {
474 let rows = if let Some(doc_type) = jacs_type {
475 self.block_on(async {
476 sqlx::query(
477 "SELECT jacs_id, jacs_version, agent_id, jacs_type, raw_contents, file_contents \
478 FROM jacs_document WHERE file_contents->>$1 = $2 AND jacs_type = $3 AND tombstoned = false \
479 ORDER BY created_at DESC LIMIT $4 OFFSET $5",
480 )
481 .bind(field_path)
482 .bind(value)
483 .bind(doc_type)
484 .bind(limit as i64)
485 .bind(offset as i64)
486 .fetch_all(&self.pool)
487 .await
488 })
489 } else {
490 self.block_on(async {
491 sqlx::query(
492 "SELECT jacs_id, jacs_version, agent_id, jacs_type, raw_contents, file_contents \
493 FROM jacs_document WHERE file_contents->>$1 = $2 AND tombstoned = false \
494 ORDER BY created_at DESC LIMIT $3 OFFSET $4",
495 )
496 .bind(field_path)
497 .bind(value)
498 .bind(limit as i64)
499 .bind(offset as i64)
500 .fetch_all(&self.pool)
501 .await
502 })
503 }
504 .map_err(|e| {
505 JacsError::DatabaseError {
506 operation: "query_by_field".to_string(),
507 reason: e.to_string(),
508 }
509 })?;
510
511 rows.iter().map(Self::row_to_document).collect()
512 }
513
514 fn count_by_type(&self, jacs_type: &str) -> Result<usize, JacsError> {
515 let count: i64 = self
516 .block_on(async {
517 sqlx::query_scalar::<_, i64>(
518 "SELECT COUNT(*) FROM jacs_document WHERE jacs_type = $1 AND tombstoned = false",
519 )
520 .bind(jacs_type)
521 .fetch_one(&self.pool)
522 .await
523 })
524 .map_err(|e| {
525 JacsError::DatabaseError {
526 operation: "count_by_type".to_string(),
527 reason: e.to_string(),
528 }
529 })?;
530
531 Ok(count as usize)
532 }
533
534 fn get_versions(&self, jacs_id: &str) -> Result<Vec<JACSDocument>, JacsError> {
535 let rows = self.block_on(async {
536 sqlx::query(
537 "SELECT jacs_id, jacs_version, agent_id, jacs_type, raw_contents, file_contents \
538 FROM jacs_document WHERE jacs_id = $1 AND tombstoned = false ORDER BY created_at ASC",
539 )
540 .bind(jacs_id)
541 .fetch_all(&self.pool)
542 .await
543 })
544 .map_err(|e| {
545 JacsError::DatabaseError {
546 operation: "get_versions".to_string(),
547 reason: e.to_string(),
548 }
549 })?;
550
551 rows.iter().map(Self::row_to_document).collect()
552 }
553
554 fn get_latest(&self, jacs_id: &str) -> Result<JACSDocument, JacsError> {
555 self.get_latest_document(jacs_id)
556 }
557
558 fn query_by_agent(
559 &self,
560 agent_id: &str,
561 jacs_type: Option<&str>,
562 limit: usize,
563 offset: usize,
564 ) -> Result<Vec<JACSDocument>, JacsError> {
565 let rows = if let Some(doc_type) = jacs_type {
566 self.block_on(async {
567 sqlx::query(
568 "SELECT jacs_id, jacs_version, agent_id, jacs_type, raw_contents, file_contents \
569 FROM jacs_document WHERE agent_id = $1 AND jacs_type = $2 AND tombstoned = false \
570 ORDER BY created_at DESC LIMIT $3 OFFSET $4",
571 )
572 .bind(agent_id)
573 .bind(doc_type)
574 .bind(limit as i64)
575 .bind(offset as i64)
576 .fetch_all(&self.pool)
577 .await
578 })
579 } else {
580 self.block_on(async {
581 sqlx::query(
582 "SELECT jacs_id, jacs_version, agent_id, jacs_type, raw_contents, file_contents \
583 FROM jacs_document WHERE agent_id = $1 AND tombstoned = false \
584 ORDER BY created_at DESC LIMIT $2 OFFSET $3",
585 )
586 .bind(agent_id)
587 .bind(limit as i64)
588 .bind(offset as i64)
589 .fetch_all(&self.pool)
590 .await
591 })
592 }
593 .map_err(|e| {
594 JacsError::DatabaseError {
595 operation: "query_by_agent".to_string(),
596 reason: e.to_string(),
597 }
598 })?;
599
600 rows.iter().map(Self::row_to_document).collect()
601 }
602
603 fn run_migrations(&self) -> Result<(), JacsError> {
604 self.block_on(async {
605 sqlx::query(Self::CREATE_TABLE_SQL)
606 .execute(&self.pool)
607 .await
608 })
609 .map_err(|e| JacsError::DatabaseError {
610 operation: "run_migrations".to_string(),
611 reason: e.to_string(),
612 })?;
613
614 for index_sql in Self::CREATE_INDEXES_SQL {
615 self.block_on(async { sqlx::query(index_sql).execute(&self.pool).await })
616 .map_err(|e| JacsError::DatabaseError {
617 operation: "run_migrations".to_string(),
618 reason: format!("Failed to create index: {}", e),
619 })?;
620 }
621
622 self.block_on(async {
624 sqlx::query(Self::CREATE_FTS_INDEX_SQL)
625 .execute(&self.pool)
626 .await
627 })
628 .map_err(|e| JacsError::DatabaseError {
629 operation: "run_migrations".to_string(),
630 reason: format!("Failed to create FTS index: {}", e),
631 })?;
632
633 let _ = self.block_on(async {
636 sqlx::query("ALTER TABLE jacs_document ADD COLUMN IF NOT EXISTS tombstoned BOOLEAN NOT NULL DEFAULT false")
637 .execute(&self.pool)
638 .await
639 });
640
641 Ok(())
642 }
643}
644
645impl SearchProvider for PostgresStorage {
646 fn search(&self, query: SearchQuery) -> Result<SearchResults, JacsError> {
647 if let Some(FieldFilter {
649 ref field_path,
650 ref value,
651 }) = query.field_filter
652 {
653 let docs = self
654 .query_by_field(
655 field_path,
656 value,
657 query.jacs_type.as_deref(),
658 query.limit,
659 query.offset,
660 )
661 .map_err(|e| {
662 JacsError::StorageError(format!("field_filter search failed: {}", e))
663 })?;
664
665 let total_count = docs.len();
666 let results = docs
667 .into_iter()
668 .map(|doc| SearchHit {
669 document: doc,
670 score: 1.0,
671 matched_fields: vec![field_path.clone()],
672 })
673 .collect();
674
675 return Ok(SearchResults {
676 results,
677 total_count,
678 method: SearchMethod::FieldMatch,
679 });
680 }
681
682 if query.query.is_empty() {
683 return Ok(SearchResults {
684 results: vec![],
685 total_count: 0,
686 method: SearchMethod::FullText,
687 });
688 }
689
690 let has_type = query.jacs_type.is_some();
693 let has_agent = query.agent_id.is_some();
694
695 let (count_sql, results_sql) = match (has_type, has_agent) {
697 (true, true) => (
698 "SELECT COUNT(*) FROM jacs_document \
699 WHERE to_tsvector('english', raw_contents) @@ plainto_tsquery('english', $1) \
700 AND jacs_type = $2 AND agent_id = $3 AND tombstoned = false"
701 .to_string(),
702 "SELECT jacs_id, jacs_version, agent_id, jacs_type, raw_contents, file_contents, \
703 ts_rank(to_tsvector('english', raw_contents), plainto_tsquery('english', $1)) AS rank \
704 FROM jacs_document \
705 WHERE to_tsvector('english', raw_contents) @@ plainto_tsquery('english', $1) \
706 AND jacs_type = $2 AND agent_id = $3 AND tombstoned = false \
707 ORDER BY rank DESC LIMIT $4 OFFSET $5"
708 .to_string(),
709 ),
710 (true, false) => (
711 "SELECT COUNT(*) FROM jacs_document \
712 WHERE to_tsvector('english', raw_contents) @@ plainto_tsquery('english', $1) \
713 AND jacs_type = $2 AND tombstoned = false"
714 .to_string(),
715 "SELECT jacs_id, jacs_version, agent_id, jacs_type, raw_contents, file_contents, \
716 ts_rank(to_tsvector('english', raw_contents), plainto_tsquery('english', $1)) AS rank \
717 FROM jacs_document \
718 WHERE to_tsvector('english', raw_contents) @@ plainto_tsquery('english', $1) \
719 AND jacs_type = $2 AND tombstoned = false \
720 ORDER BY rank DESC LIMIT $3 OFFSET $4"
721 .to_string(),
722 ),
723 (false, true) => (
724 "SELECT COUNT(*) FROM jacs_document \
725 WHERE to_tsvector('english', raw_contents) @@ plainto_tsquery('english', $1) \
726 AND agent_id = $2 AND tombstoned = false"
727 .to_string(),
728 "SELECT jacs_id, jacs_version, agent_id, jacs_type, raw_contents, file_contents, \
729 ts_rank(to_tsvector('english', raw_contents), plainto_tsquery('english', $1)) AS rank \
730 FROM jacs_document \
731 WHERE to_tsvector('english', raw_contents) @@ plainto_tsquery('english', $1) \
732 AND agent_id = $2 AND tombstoned = false \
733 ORDER BY rank DESC LIMIT $3 OFFSET $4"
734 .to_string(),
735 ),
736 (false, false) => (
737 "SELECT COUNT(*) FROM jacs_document \
738 WHERE to_tsvector('english', raw_contents) @@ plainto_tsquery('english', $1) \
739 AND tombstoned = false"
740 .to_string(),
741 "SELECT jacs_id, jacs_version, agent_id, jacs_type, raw_contents, file_contents, \
742 ts_rank(to_tsvector('english', raw_contents), plainto_tsquery('english', $1)) AS rank \
743 FROM jacs_document \
744 WHERE to_tsvector('english', raw_contents) @@ plainto_tsquery('english', $1) \
745 AND tombstoned = false \
746 ORDER BY rank DESC LIMIT $2 OFFSET $3"
747 .to_string(),
748 ),
749 };
750
751 let mut count_q = sqlx::query_scalar::<_, i64>(&count_sql).bind(&query.query);
753 if let Some(ref jt) = query.jacs_type {
754 count_q = count_q.bind(jt);
755 }
756 if let Some(ref ai) = query.agent_id {
757 count_q = count_q.bind(ai);
758 }
759 let total_count: i64 = self
760 .block_on(async { count_q.fetch_one(&self.pool).await })
761 .map_err(|e| JacsError::StorageError(format!("FTS count query failed: {}", e)))?;
762
763 let mut results_q = sqlx::query(&results_sql).bind(&query.query);
765 if let Some(ref jt) = query.jacs_type {
766 results_q = results_q.bind(jt);
767 }
768 if let Some(ref ai) = query.agent_id {
769 results_q = results_q.bind(ai);
770 }
771 results_q = results_q.bind(query.limit as i64).bind(query.offset as i64);
772
773 let rows = self
774 .block_on(async { results_q.fetch_all(&self.pool).await })
775 .map_err(|e| JacsError::StorageError(format!("FTS search failed: {}", e)))?;
776
777 let ranks: Vec<f32> = rows
779 .iter()
780 .map(|row| row.try_get::<f32, _>("rank").unwrap_or(0.0))
781 .collect();
782 let max_rank = ranks.iter().cloned().fold(f32::MIN, f32::max);
783
784 let mut results = Vec::new();
785 for (row, &rank) in rows.iter().zip(ranks.iter()) {
786 let doc = Self::row_to_document(row)
787 .map_err(|e| JacsError::StorageError(format!("Failed to parse row: {}", e)))?;
788
789 let score = if max_rank > 0.0 {
791 (rank / max_rank) as f64
792 } else {
793 0.0
794 };
795
796 if let Some(min_score) = query.min_score {
797 if score < min_score {
798 continue;
799 }
800 }
801
802 results.push(SearchHit {
803 document: doc,
804 score,
805 matched_fields: vec!["raw_contents".to_string()],
806 });
807 }
808
809 Ok(SearchResults {
810 results,
811 total_count: total_count as usize,
812 method: SearchMethod::FullText,
813 })
814 }
815
816 fn capabilities(&self) -> SearchCapabilities {
817 SearchCapabilities {
818 fulltext: true,
819 vector: false,
820 hybrid: false,
821 field_filter: true,
822 }
823 }
824}
825
826#[cfg(test)]
827mod tests {
828 use super::*;
829
830 #[test]
831 fn capabilities_reports_fulltext_true_vector_false() {
832 let caps = SearchCapabilities {
833 fulltext: true,
834 vector: false,
835 hybrid: false,
836 field_filter: true,
837 };
838 assert!(caps.fulltext);
839 assert!(!caps.vector);
840 assert!(!caps.hybrid);
841 assert!(caps.field_filter);
842 }
843
844 #[test]
845 fn parse_key_valid() {
846 let (id, version) = PostgresStorage::parse_key("doc-1:v1").unwrap();
847 assert_eq!(id, "doc-1");
848 assert_eq!(version, "v1");
849 }
850
851 #[test]
852 fn parse_key_invalid() {
853 let result = PostgresStorage::parse_key("invalid-key-no-colon");
854 assert!(result.is_err());
855 }
856
857 #[test]
858 fn parse_key_with_colons_in_version() {
859 let (id, version) = PostgresStorage::parse_key("doc-1:v1:extra").unwrap();
860 assert_eq!(id, "doc-1");
861 assert_eq!(version, "v1:extra");
862 }
863}