1use std::{collections::HashSet, error::Error, fmt, str::FromStr, sync::Arc};
2
3use docx_store::models::{DocBlock, DocChunk, DocSource, Ingest, Project, RelationRecord, Symbol};
4use docx_store::schema::{
5 SCHEMA_BOOTSTRAP_SURQL, TABLE_DOC_BLOCK, TABLE_DOC_SOURCE, TABLE_INGEST, TABLE_PROJECT,
6 TABLE_SYMBOL,
7};
8use serde::Serialize;
9use serde_json::Value;
10use surrealdb::types::{RecordId, RecordIdKey, Regex, SurrealValue, Table, ToSql};
11use surrealdb::{Connection, Surreal};
12use tracing::warn;
13use uuid::Uuid;
14
15#[derive(Debug)]
17pub enum StoreError {
18 Surreal(Box<surrealdb::Error>),
19 InvalidInput(String),
20}
21
22impl fmt::Display for StoreError {
23 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
24 match self {
25 Self::Surreal(err) => write!(f, "SurrealDB error: {err}"),
26 Self::InvalidInput(message) => write!(f, "Invalid input: {message}"),
27 }
28 }
29}
30
31impl Error for StoreError {}
32
33impl From<surrealdb::Error> for StoreError {
34 fn from(err: surrealdb::Error) -> Self {
35 Self::Surreal(Box::new(err))
36 }
37}
38
39pub type StoreResult<T> = Result<T, StoreError>;
40
41const OPTIONAL_DOC_BLOCK_FTS_START: &str = "-- OPTIONAL_DOC_BLOCK_FTS_START";
42const OPTIONAL_DOC_BLOCK_FTS_END: &str = "-- OPTIONAL_DOC_BLOCK_FTS_END";
43
44pub struct SurrealDocStore<C: Connection> {
46 db: Arc<Surreal<C>>,
47 schema_ready: Arc<tokio::sync::OnceCell<()>>,
48}
49
50impl<C: Connection> Clone for SurrealDocStore<C> {
51 fn clone(&self) -> Self {
52 Self {
53 db: self.db.clone(),
54 schema_ready: self.schema_ready.clone(),
55 }
56 }
57}
58
59impl<C: Connection> SurrealDocStore<C> {
60 #[must_use]
61 pub fn new(db: Surreal<C>) -> Self {
62 Self {
63 db: Arc::new(db),
64 schema_ready: Arc::new(tokio::sync::OnceCell::new()),
65 }
66 }
67
68 #[must_use]
69 pub fn from_arc(db: Arc<Surreal<C>>) -> Self {
70 Self {
71 db,
72 schema_ready: Arc::new(tokio::sync::OnceCell::new()),
73 }
74 }
75
76 #[must_use]
77 pub fn db(&self) -> &Surreal<C> {
78 &self.db
79 }
80
81 async fn ensure_schema(&self) -> StoreResult<()> {
82 self.schema_ready
83 .get_or_try_init(|| async {
84 let (required_schema, optional_doc_block_fts) =
85 split_optional_doc_block_fts_schema(SCHEMA_BOOTSTRAP_SURQL)?;
86 apply_schema(self.db.as_ref(), required_schema.as_str()).await?;
87 if let Some(optional_doc_block_fts) = optional_doc_block_fts
88 && let Err(error) =
89 apply_schema(self.db.as_ref(), optional_doc_block_fts.as_str()).await
90 {
91 warn!(
92 error = %error,
93 "optional doc_block full-text schema was skipped"
94 );
95 }
96 Ok::<(), StoreError>(())
97 })
98 .await?;
99 Ok(())
100 }
101
102 pub async fn upsert_project(&self, mut project: Project) -> StoreResult<Project> {
107 self.ensure_schema().await?;
108 ensure_non_empty(&project.project_id, "project_id")?;
109 let id = project
110 .id
111 .clone()
112 .unwrap_or_else(|| project.project_id.clone());
113 project.id = Some(id.clone());
114 let record = RecordId::new(TABLE_PROJECT, id.as_str());
115 self.db
116 .query("UPSERT $record CONTENT $data RETURN NONE;")
117 .bind(("record", record))
118 .bind(("data", project.clone()))
119 .await?
120 .check()?;
121 Ok(project)
122 }
123
124 pub async fn get_project(&self, project_id: &str) -> StoreResult<Option<Project>> {
129 self.ensure_schema().await?;
130 let record = RecordId::new(TABLE_PROJECT, project_id);
131 let mut response = self
132 .db
133 .query("SELECT *, record::id(id) AS id FROM $record;")
134 .bind(("record", record))
135 .await?;
136 let mut records: Vec<Project> = response.take(0)?;
137 Ok(records.pop())
138 }
139
140 pub async fn get_ingest(&self, ingest_id: &str) -> StoreResult<Option<Ingest>> {
145 self.ensure_schema().await?;
146 let record = RecordId::new(TABLE_INGEST, ingest_id);
147 let mut response = self
148 .db
149 .query("SELECT * FROM $record;")
150 .bind(("record", record))
151 .await?;
152 let records: Vec<IngestRow> = response.take(0)?;
153 if let Some(ingest) = records.into_iter().next().map(Ingest::from) {
154 return Ok(Some(ingest));
155 }
156 if ingest_id.contains("::") {
157 return Ok(None);
158 }
159
160 let mut response = self
161 .db
162 .query("SELECT * FROM ingest WHERE extra.requested_ingest_id = $requested_id;")
163 .bind(("requested_id", ingest_id.to_string()))
164 .await?;
165 let records: Vec<IngestRow> = response.take(0)?;
166 let mut rows = records.into_iter();
167 let first = rows.next();
168 if rows.next().is_some() {
169 return Err(StoreError::InvalidInput(format!(
170 "ingest_id '{ingest_id}' is ambiguous; use project-scoped ingest id"
171 )));
172 }
173 Ok(first.map(Ingest::from))
174 }
175
176 pub async fn list_projects(&self, limit: usize) -> StoreResult<Vec<Project>> {
181 self.ensure_schema().await?;
182 let limit = limit_to_i64(limit)?;
183 let query = "SELECT *, record::id(id) AS id FROM project LIMIT $limit;";
184 let mut response = self.db.query(query).bind(("limit", limit)).await?;
185 let records: Vec<Project> = response.take(0)?;
186 Ok(records)
187 }
188
189 pub async fn search_projects(&self, pattern: &str, limit: usize) -> StoreResult<Vec<Project>> {
194 self.ensure_schema().await?;
195 let Some(pattern) = normalize_pattern(pattern) else {
196 return self.list_projects(limit).await;
197 };
198 let limit = limit_to_i64(limit)?;
199 let regex = build_project_regex(&pattern)?;
200 let query = format!(
201 "SELECT *, record::id(id) AS id FROM project WHERE search_text != NONE AND string::matches(search_text, {}) LIMIT $limit;",
202 regex.to_sql()
203 );
204 let mut response = self.db.query(query).bind(("limit", limit)).await?;
205 let records: Vec<Project> = response.take(0)?;
206 Ok(records)
207 }
208
209 pub async fn list_ingests(&self, project_id: &str, limit: usize) -> StoreResult<Vec<Ingest>> {
214 self.ensure_schema().await?;
215 let project_id = project_id.to_string();
216 let limit = limit_to_i64(limit)?;
217 let query = "SELECT * FROM ingest WHERE project_id = $project_id ORDER BY ingested_at DESC LIMIT $limit;";
218 let mut response = self
219 .db
220 .query(query)
221 .bind(("project_id", project_id))
222 .bind(("limit", limit))
223 .await?;
224 let records: Vec<IngestRow> = response.take(0)?;
225 Ok(records.into_iter().map(Ingest::from).collect())
226 }
227
228 pub async fn create_ingest(&self, mut ingest: Ingest) -> StoreResult<Ingest> {
233 self.ensure_schema().await?;
234 let provided_id = ingest.id.clone();
235 let id = provided_id.as_ref().map_or_else(
236 || Uuid::new_v4().to_string(),
237 |value| make_scoped_ingest_id(&ingest.project_id, value),
238 );
239 if let Some(provided_id) = provided_id
240 && provided_id != id
241 {
242 ingest.extra = Some(merge_ingest_extra(ingest.extra.take(), &provided_id));
243 }
244 ingest.id = Some(id.clone());
245 let record = RecordId::new(TABLE_INGEST, id.as_str());
246 self.db
247 .query("UPSERT $record CONTENT $data RETURN NONE;")
248 .bind(("record", record))
249 .bind(("data", ingest.clone()))
250 .await?
251 .check()?;
252 Ok(ingest)
253 }
254
255 pub async fn create_doc_source(&self, mut source: DocSource) -> StoreResult<DocSource> {
260 self.ensure_schema().await?;
261 let id = source
262 .id
263 .clone()
264 .unwrap_or_else(|| Uuid::new_v4().to_string());
265 source.id = Some(id.clone());
266 self.db
267 .query("CREATE doc_source CONTENT $data RETURN NONE;")
268 .bind(("data", source.clone()))
269 .await?
270 .check()?;
271 Ok(source)
272 }
273
274 pub async fn upsert_symbol(&self, mut symbol: Symbol) -> StoreResult<Symbol> {
279 self.ensure_schema().await?;
280 ensure_non_empty(&symbol.symbol_key, "symbol_key")?;
281 let id = symbol
282 .id
283 .clone()
284 .unwrap_or_else(|| symbol.symbol_key.clone());
285 symbol.id = Some(id.clone());
286 let record = RecordId::new(TABLE_SYMBOL, id.as_str());
287 self.db
288 .query("UPSERT $record CONTENT $data RETURN NONE;")
289 .bind(("record", record))
290 .bind(("data", symbol.clone()))
291 .await?
292 .check()?;
293 Ok(symbol)
294 }
295
296 pub async fn create_doc_block(&self, mut block: DocBlock) -> StoreResult<DocBlock> {
301 self.ensure_schema().await?;
302 let id = block
303 .id
304 .clone()
305 .unwrap_or_else(|| Uuid::new_v4().to_string());
306 block.id = Some(id.clone());
307 self.db
308 .query("CREATE doc_block CONTENT $data RETURN NONE;")
309 .bind(("data", block.clone()))
310 .await?
311 .check()?;
312 Ok(block)
313 }
314
315 pub async fn create_doc_blocks(&self, blocks: Vec<DocBlock>) -> StoreResult<Vec<DocBlock>> {
320 self.ensure_schema().await?;
321 if blocks.is_empty() {
322 return Ok(Vec::new());
323 }
324 let futs: Vec<_> = blocks
325 .into_iter()
326 .map(|block| self.create_doc_block(block))
327 .collect();
328 let results = futures::future::join_all(futs).await;
329 results.into_iter().collect()
330 }
331
332 pub async fn create_doc_chunks(&self, chunks: Vec<DocChunk>) -> StoreResult<Vec<DocChunk>> {
337 self.ensure_schema().await?;
338 if chunks.is_empty() {
339 return Ok(Vec::new());
340 }
341 let mut stored = Vec::with_capacity(chunks.len());
342 for mut chunk in chunks {
343 let id = chunk
344 .id
345 .clone()
346 .unwrap_or_else(|| Uuid::new_v4().to_string());
347 chunk.id = Some(id.clone());
348 self.db
349 .query("CREATE doc_chunk CONTENT $data RETURN NONE;")
350 .bind(("data", chunk.clone()))
351 .await?
352 .check()?;
353 stored.push(chunk);
354 }
355 Ok(stored)
356 }
357
358 pub async fn create_relation(
363 &self,
364 table: &str,
365 relation: RelationRecord,
366 ) -> StoreResult<RelationRecord> {
367 self.ensure_schema().await?;
368 ensure_identifier(table, "table")?;
369 let in_id = parse_record_id(&relation.in_id, "in_id")?;
370 let out_id = parse_record_id(&relation.out_id, "out_id")?;
371 let payload = RelationPayload::from(&relation);
372 let statement = format!("RELATE $in->{table}->$out CONTENT $data RETURN NONE;");
373 self.db
374 .query(statement)
375 .bind(("in", in_id))
376 .bind(("out", out_id))
377 .bind(("data", payload))
378 .await?
379 .check()?;
380 Ok(relation)
381 }
382
383 pub async fn create_relations(
388 &self,
389 table: &str,
390 relations: Vec<RelationRecord>,
391 ) -> StoreResult<Vec<RelationRecord>> {
392 if relations.is_empty() {
393 return Ok(Vec::new());
394 }
395 let futs: Vec<_> = relations
396 .into_iter()
397 .map(|r| self.create_relation(table, r))
398 .collect();
399 let results = futures::future::join_all(futs).await;
400 results.into_iter().collect()
401 }
402
403 pub async fn list_databases(&self) -> StoreResult<Vec<String>> {
408 let mut response = self.db.query("INFO FOR NS;").await?;
409 let info: Option<Value> = response.take(0)?;
410 let names = info
411 .and_then(|v| v.get("databases").cloned())
412 .and_then(|v| {
413 v.as_object()
414 .map(|obj| obj.keys().cloned().collect::<Vec<_>>())
415 })
416 .unwrap_or_default();
417 Ok(names)
418 }
419
420 pub async fn remove_database(&self, db_name: &str) -> StoreResult<()> {
425 ensure_non_empty(db_name, "db_name")?;
426 let identifier = Table::from(db_name).to_sql();
427 let statement = format!("REMOVE DATABASE {identifier};");
428 self.db.query(statement).await?.check()?;
429 Ok(())
430 }
431
432 pub async fn get_symbol(&self, symbol_key: &str) -> StoreResult<Option<Symbol>> {
437 self.ensure_schema().await?;
438 let record = RecordId::new(TABLE_SYMBOL, symbol_key);
439 let mut response = self
440 .db
441 .query("SELECT *, record::id(id) AS id FROM $record;")
442 .bind(("record", record))
443 .await?;
444 let mut records: Vec<Symbol> = response.take(0)?;
445 Ok(records.pop())
446 }
447
448 pub async fn get_symbol_by_project(
453 &self,
454 project_id: &str,
455 symbol_key: &str,
456 ) -> StoreResult<Option<Symbol>> {
457 self.ensure_schema().await?;
458 let project_id = project_id.to_string();
459 let symbol_key = symbol_key.to_string();
460 let query = "SELECT *, record::id(id) AS id FROM symbol WHERE project_id = $project_id AND symbol_key = $symbol_key LIMIT 1;";
461 let mut response = self
462 .db
463 .query(query)
464 .bind(("project_id", project_id))
465 .bind(("symbol_key", symbol_key))
466 .await?;
467 let mut records: Vec<Symbol> = response.take(0)?;
468 Ok(records.pop())
469 }
470
471 pub async fn list_symbols_by_name(
476 &self,
477 project_id: &str,
478 name: &str,
479 limit: usize,
480 ) -> StoreResult<Vec<Symbol>> {
481 self.ensure_schema().await?;
482 let project_id = project_id.to_string();
483 let name = name.to_string();
484 let limit = limit_to_i64(limit)?;
485 let query = "SELECT *, record::id(id) AS id FROM symbol WHERE project_id = $project_id AND name CONTAINS $name LIMIT $limit;";
486 let mut response = self
487 .db
488 .query(query)
489 .bind(("project_id", project_id))
490 .bind(("name", name))
491 .bind(("limit", limit))
492 .await?;
493 let records: Vec<Symbol> = response.take(0)?;
494 Ok(records)
495 }
496
497 pub async fn search_symbols_advanced(
502 &self,
503 project_id: &str,
504 name: Option<&str>,
505 qualified_name: Option<&str>,
506 symbol_key: Option<&str>,
507 signature: Option<&str>,
508 limit: usize,
509 ) -> StoreResult<Vec<Symbol>> {
510 self.ensure_schema().await?;
511 let project_id = project_id.to_string();
512 let limit = limit_to_i64(limit)?;
513
514 let mut clauses = vec!["project_id = $project_id".to_string()];
515 if symbol_key.is_some() {
516 clauses.push("symbol_key = $symbol_key".to_string());
517 }
518 if name.is_some() {
519 clauses.push(
520 "name != NONE AND string::contains(string::lowercase(name), string::lowercase($name))"
521 .to_string(),
522 );
523 }
524 if qualified_name.is_some() {
525 clauses.push(
526 "qualified_name != NONE AND string::contains(string::lowercase(qualified_name), string::lowercase($qualified_name))"
527 .to_string(),
528 );
529 }
530 if signature.is_some() {
531 clauses.push(
532 "signature != NONE AND string::contains(string::lowercase(signature), string::lowercase($signature))"
533 .to_string(),
534 );
535 }
536
537 let query = format!(
538 "SELECT *, record::id(id) AS id FROM symbol WHERE {} LIMIT $limit;",
539 clauses.join(" AND ")
540 );
541
542 let mut request = self
543 .db
544 .query(query)
545 .bind(("project_id", project_id))
546 .bind(("limit", limit));
547 if let Some(value) = symbol_key {
548 request = request.bind(("symbol_key", value.to_string()));
549 }
550 if let Some(value) = name {
551 request = request.bind(("name", value.to_string()));
552 }
553 if let Some(value) = qualified_name {
554 request = request.bind(("qualified_name", value.to_string()));
555 }
556 if let Some(value) = signature {
557 request = request.bind(("signature", value.to_string()));
558 }
559
560 let mut response = request.await?;
561 let records: Vec<Symbol> = response.take(0)?;
562 Ok(records)
563 }
564
565 pub async fn list_symbol_kinds(&self, project_id: &str) -> StoreResult<Vec<String>> {
570 self.ensure_schema().await?;
571 let project_id = project_id.to_string();
572 let query = "SELECT kind FROM symbol WHERE project_id = $project_id GROUP BY kind;";
573 let mut response = self
574 .db
575 .query(query)
576 .bind(("project_id", project_id))
577 .await?;
578 let records: Vec<SymbolKindRow> = response.take(0)?;
579 let mut kinds: Vec<String> = records
580 .into_iter()
581 .filter_map(|row| row.kind)
582 .filter(|value| !value.trim().is_empty())
583 .collect();
584 kinds.sort();
585 kinds.dedup();
586 Ok(kinds)
587 }
588
589 pub async fn list_members_by_scope(
594 &self,
595 project_id: &str,
596 scope: &str,
597 limit: usize,
598 ) -> StoreResult<Vec<Symbol>> {
599 self.ensure_schema().await?;
600 let Some(scope) = normalize_pattern(scope) else {
601 return Ok(Vec::new());
602 };
603 let project_id = project_id.to_string();
604 let limit = limit_to_i64(limit)?;
605 let mut response = if scope.contains('*') {
606 let regex = build_scope_regex(&scope)?;
607 let query = format!(
608 "SELECT *, record::id(id) AS id FROM symbol WHERE project_id = $project_id AND qualified_name != NONE AND string::matches(string::lowercase(qualified_name), {}) LIMIT $limit;",
609 regex.to_sql()
610 );
611 self.db
612 .query(query)
613 .bind(("project_id", project_id))
614 .bind(("limit", limit))
615 .await?
616 } else {
617 let query = "SELECT *, record::id(id) AS id FROM symbol WHERE project_id = $project_id AND qualified_name != NONE AND string::starts_with(string::lowercase(qualified_name), $scope) LIMIT $limit;";
618 self.db
619 .query(query)
620 .bind(("project_id", project_id))
621 .bind(("scope", scope))
622 .bind(("limit", limit))
623 .await?
624 };
625 let records: Vec<Symbol> = response.take(0)?;
626 Ok(records)
627 }
628
629 pub async fn list_doc_blocks(
634 &self,
635 project_id: &str,
636 symbol_key: &str,
637 ingest_id: Option<&str>,
638 ) -> StoreResult<Vec<DocBlock>> {
639 self.ensure_schema().await?;
640 let project_id = project_id.to_string();
641 let symbol_key = symbol_key.to_string();
642 let (query, binds) = ingest_id.map_or(
643 (
644 "SELECT *, record::id(id) AS id FROM doc_block WHERE project_id = $project_id AND symbol_key = $symbol_key;",
645 None,
646 ),
647 |ingest_id| (
648 "SELECT *, record::id(id) AS id FROM doc_block WHERE project_id = $project_id AND symbol_key = $symbol_key AND ingest_id = $ingest_id;",
649 Some(ingest_id.to_string()),
650 ),
651 );
652 let response = self
653 .db
654 .query(query)
655 .bind(("project_id", project_id))
656 .bind(("symbol_key", symbol_key));
657 let mut response = if let Some(ingest_id) = binds {
658 response.bind(("ingest_id", ingest_id)).await?
659 } else {
660 response.await?
661 };
662 let records: Vec<DocBlock> = response.take(0)?;
663 Ok(records)
664 }
665
666 pub async fn search_doc_blocks(
671 &self,
672 project_id: &str,
673 text: &str,
674 limit: usize,
675 ) -> StoreResult<Vec<DocBlock>> {
676 self.ensure_schema().await?;
677 let project_id = project_id.to_string();
678 let text = text.to_string();
679 let limit = limit_to_i64(limit)?;
680 let query = "\
681 SELECT *, record::id(id) AS id FROM doc_block \
682 WHERE project_id = $project_id \
683 AND (string::contains(string::lowercase(summary ?? ''), string::lowercase($text)) \
684 OR string::contains(string::lowercase(remarks ?? ''), string::lowercase($text)) \
685 OR string::contains(string::lowercase(returns ?? ''), string::lowercase($text)) \
686 OR string::contains(string::lowercase(errors ?? ''), string::lowercase($text)) \
687 OR string::contains(string::lowercase(panics ?? ''), string::lowercase($text)) \
688 OR string::contains(string::lowercase(safety ?? ''), string::lowercase($text))) \
689 LIMIT $limit;";
690 let mut response = self
691 .db
692 .query(query)
693 .bind(("project_id", project_id))
694 .bind(("text", text))
695 .bind(("limit", limit))
696 .await?;
697 let records: Vec<DocBlock> = response.take(0)?;
698 Ok(records)
699 }
700
701 pub async fn list_doc_sources(
706 &self,
707 project_id: &str,
708 ingest_ids: &[String],
709 ) -> StoreResult<Vec<DocSource>> {
710 self.ensure_schema().await?;
711 if ingest_ids.is_empty() {
712 return Ok(Vec::new());
713 }
714 let project_id = project_id.to_string();
715 let ingest_ids = normalize_ingest_filter_ids(project_id.as_str(), ingest_ids);
716 if ingest_ids.is_empty() {
717 return Ok(Vec::new());
718 }
719 let query =
720 "SELECT * FROM doc_source WHERE project_id = $project_id AND ingest_id IN $ingest_ids;";
721 let mut response = self
722 .db
723 .query(query)
724 .bind(("project_id", project_id))
725 .bind(("ingest_ids", ingest_ids))
726 .await?;
727 let records: Vec<DocSourceRow> = response.take(0)?;
728 Ok(records.into_iter().map(DocSource::from).collect())
729 }
730
731 pub async fn list_doc_sources_by_ids(
736 &self,
737 project_id: &str,
738 doc_source_ids: &[String],
739 ) -> StoreResult<Vec<DocSource>> {
740 self.ensure_schema().await?;
741 if doc_source_ids.is_empty() {
742 return Ok(Vec::new());
743 }
744 let project_id = project_id.to_string();
745 let mut unique_ids = HashSet::new();
746 let records: Vec<RecordId> = doc_source_ids
747 .iter()
748 .filter(|value| !value.is_empty())
749 .filter(|value| unique_ids.insert((*value).clone()))
750 .map(|value| RecordId::new(TABLE_DOC_SOURCE, value.as_str()))
751 .collect();
752 if records.is_empty() {
753 return Ok(Vec::new());
754 }
755 let query = "SELECT * FROM doc_source WHERE project_id = $project_id AND id IN $records;";
756 let mut response = self
757 .db
758 .query(query)
759 .bind(("project_id", project_id))
760 .bind(("records", records))
761 .await?;
762 let records: Vec<DocSourceRow> = response.take(0)?;
763 Ok(records.into_iter().map(DocSource::from).collect())
764 }
765
766 pub async fn count_rows_for_project(
771 &self,
772 table: &str,
773 project_id: &str,
774 ) -> StoreResult<usize> {
775 self.ensure_schema().await?;
776 ensure_identifier(table, "table")?;
777 let query = format!(
778 "SELECT count() AS count FROM {table} WHERE project_id = $project_id GROUP ALL;"
779 );
780 let mut response = self
781 .db
782 .query(query)
783 .bind(("project_id", project_id.to_string()))
784 .await?;
785 let rows: Vec<CountRow> = response.take(0)?;
786 Ok(rows
787 .first()
788 .and_then(|row| usize::try_from(row.count).ok())
789 .unwrap_or(0))
790 }
791
792 pub async fn count_symbols_missing_field(
797 &self,
798 project_id: &str,
799 field: &str,
800 ) -> StoreResult<usize> {
801 self.ensure_schema().await?;
802 ensure_identifier(field, "field")?;
803 let query = format!(
804 "SELECT count() AS count FROM symbol WHERE project_id = $project_id AND {field} = NONE GROUP ALL;"
805 );
806 let mut response = self
807 .db
808 .query(query)
809 .bind(("project_id", project_id.to_string()))
810 .await?;
811 let rows: Vec<CountRow> = response.take(0)?;
812 Ok(rows
813 .first()
814 .and_then(|row| usize::try_from(row.count).ok())
815 .unwrap_or(0))
816 }
817
818 pub async fn list_doc_block_symbol_keys(&self, project_id: &str) -> StoreResult<Vec<String>> {
823 self.ensure_schema().await?;
824 let mut response = self
825 .db
826 .query(
827 "SELECT symbol_key FROM doc_block WHERE project_id = $project_id AND symbol_key != NONE;",
828 )
829 .bind(("project_id", project_id.to_string()))
830 .await?;
831 let rows: Vec<DocBlockSymbolKeyRow> = response.take(0)?;
832 Ok(rows.into_iter().map(|row| row.symbol_key).collect())
833 }
834
835 pub async fn list_observed_in_symbol_refs(&self, project_id: &str) -> StoreResult<Vec<String>> {
840 self.ensure_schema().await?;
841 let mut response = self
842 .db
843 .query("SELECT in AS symbol_id FROM observed_in WHERE project_id = $project_id;")
844 .bind(("project_id", project_id.to_string()))
845 .await?;
846 let rows: Vec<ObservedInSymbolRow> = response.take(0)?;
847 Ok(rows
848 .into_iter()
849 .map(|row| record_id_to_record_ref(row.symbol_id))
850 .collect())
851 }
852
853 pub async fn get_doc_source(&self, doc_source_id: &str) -> StoreResult<Option<DocSource>> {
858 self.ensure_schema().await?;
859 let record = RecordId::new(TABLE_DOC_SOURCE, doc_source_id);
860 let mut response = self
861 .db
862 .query("SELECT * FROM $record;")
863 .bind(("record", record))
864 .await?;
865 let records: Vec<DocSourceRow> = response.take(0)?;
866 Ok(records.into_iter().next().map(DocSource::from))
867 }
868
869 pub async fn list_doc_sources_by_project(
874 &self,
875 project_id: &str,
876 ingest_id: Option<&str>,
877 limit: usize,
878 ) -> StoreResult<Vec<DocSource>> {
879 self.ensure_schema().await?;
880 let project_id = project_id.to_string();
881 let limit = limit_to_i64(limit)?;
882 let (query, ingest_ids) = ingest_id.map_or(
883 (
884 "SELECT * FROM doc_source WHERE project_id = $project_id ORDER BY source_modified_at DESC LIMIT $limit;",
885 None,
886 ),
887 |ingest_id| {
888 (
889 "SELECT * FROM doc_source WHERE project_id = $project_id AND ingest_id IN $ingest_ids ORDER BY source_modified_at DESC LIMIT $limit;",
890 Some(normalize_ingest_filter_ids(
891 project_id.as_str(),
892 &[ingest_id.to_string()],
893 )),
894 )
895 },
896 );
897 if ingest_ids.as_ref().is_some_and(Vec::is_empty) {
898 return Ok(Vec::new());
899 }
900 let response = self
901 .db
902 .query(query)
903 .bind(("project_id", project_id))
904 .bind(("limit", limit));
905 let mut response = if let Some(ingest_ids) = ingest_ids {
906 response.bind(("ingest_ids", ingest_ids)).await?
907 } else {
908 response.await?
909 };
910 let records: Vec<DocSourceRow> = response.take(0)?;
911 Ok(records.into_iter().map(DocSource::from).collect())
912 }
913
914 pub async fn list_relations_from_symbol(
919 &self,
920 table: &str,
921 project_id: &str,
922 symbol_id: &str,
923 limit: usize,
924 ) -> StoreResult<Vec<RelationRecord>> {
925 self.ensure_schema().await?;
926 ensure_identifier(table, "table")?;
927 let limit = limit_to_i64(limit)?;
928 let record_id = RecordId::new(TABLE_SYMBOL, symbol_id);
929 let query = format!(
930 "SELECT id, in AS in_id, out AS out_id, project_id, ingest_id, kind, extra FROM $record->{table} WHERE project_id = $project_id LIMIT $limit;"
931 );
932 let mut response = self
933 .db
934 .query(query)
935 .bind(("project_id", project_id.to_string()))
936 .bind(("record", record_id))
937 .bind(("limit", limit))
938 .await?;
939 let records: Vec<RelationRow> = response.take(0)?;
940 Ok(records.into_iter().map(RelationRecord::from).collect())
941 }
942
943 pub async fn list_relations_to_symbol(
948 &self,
949 table: &str,
950 project_id: &str,
951 symbol_id: &str,
952 limit: usize,
953 ) -> StoreResult<Vec<RelationRecord>> {
954 self.ensure_schema().await?;
955 ensure_identifier(table, "table")?;
956 let limit = limit_to_i64(limit)?;
957 let record_id = RecordId::new(TABLE_SYMBOL, symbol_id);
958 let query = format!(
959 "SELECT id, in AS in_id, out AS out_id, project_id, ingest_id, kind, extra FROM $record<-{table} WHERE project_id = $project_id LIMIT $limit;"
960 );
961 let mut response = self
962 .db
963 .query(query)
964 .bind(("project_id", project_id.to_string()))
965 .bind(("record", record_id))
966 .bind(("limit", limit))
967 .await?;
968 let records: Vec<RelationRow> = response.take(0)?;
969 Ok(records.into_iter().map(RelationRecord::from).collect())
970 }
971
972 #[allow(clippy::too_many_lines)]
977 pub async fn fetch_symbol_adjacency(
978 &self,
979 symbol_id: &str,
980 project_id: &str,
981 limit: usize,
982 ) -> StoreResult<AdjacencyRaw> {
983 self.ensure_schema().await?;
984 let limit = limit_to_i64(limit)?;
985 let record = RecordId::new(TABLE_SYMBOL, symbol_id);
986 let query = r"
987 LET $sym = $record;
988 SELECT id, in AS in_id, out AS out_id, project_id, ingest_id, kind, extra FROM $sym->member_of WHERE project_id = $project_id LIMIT $limit;
989 SELECT id, in AS in_id, out AS out_id, project_id, ingest_id, kind, extra FROM $sym<-member_of WHERE project_id = $project_id LIMIT $limit;
990 SELECT id, in AS in_id, out AS out_id, project_id, ingest_id, kind, extra FROM $sym->contains WHERE project_id = $project_id LIMIT $limit;
991 SELECT id, in AS in_id, out AS out_id, project_id, ingest_id, kind, extra FROM $sym<-contains WHERE project_id = $project_id LIMIT $limit;
992 SELECT id, in AS in_id, out AS out_id, project_id, ingest_id, kind, extra FROM $sym->returns WHERE project_id = $project_id LIMIT $limit;
993 SELECT id, in AS in_id, out AS out_id, project_id, ingest_id, kind, extra FROM $sym<-returns WHERE project_id = $project_id LIMIT $limit;
994 SELECT id, in AS in_id, out AS out_id, project_id, ingest_id, kind, extra FROM $sym->param_type WHERE project_id = $project_id LIMIT $limit;
995 SELECT id, in AS in_id, out AS out_id, project_id, ingest_id, kind, extra FROM $sym<-param_type WHERE project_id = $project_id LIMIT $limit;
996 SELECT id, in AS in_id, out AS out_id, project_id, ingest_id, kind, extra FROM $sym->see_also WHERE project_id = $project_id LIMIT $limit;
997 SELECT id, in AS in_id, out AS out_id, project_id, ingest_id, kind, extra FROM $sym<-see_also WHERE project_id = $project_id LIMIT $limit;
998 SELECT id, in AS in_id, out AS out_id, project_id, ingest_id, kind, extra FROM $sym->inherits WHERE project_id = $project_id LIMIT $limit;
999 SELECT id, in AS in_id, out AS out_id, project_id, ingest_id, kind, extra FROM $sym<-inherits WHERE project_id = $project_id LIMIT $limit;
1000 SELECT id, in AS in_id, out AS out_id, project_id, ingest_id, kind, extra FROM $sym->references WHERE project_id = $project_id LIMIT $limit;
1001 SELECT id, in AS in_id, out AS out_id, project_id, ingest_id, kind, extra FROM $sym<-references WHERE project_id = $project_id LIMIT $limit;
1002 SELECT id, in AS in_id, out AS out_id, project_id, ingest_id, kind, extra FROM $sym->observed_in WHERE project_id = $project_id LIMIT $limit;
1003 ";
1004 let mut response = self
1005 .db
1006 .query(query)
1007 .bind(("record", record))
1008 .bind(("project_id", project_id.to_string()))
1009 .bind(("limit", limit))
1010 .await?;
1011
1012 let member_of_out: Vec<RelationRow> = response.take(1)?;
1014 let member_of_in: Vec<RelationRow> = response.take(2)?;
1015 let contains_out: Vec<RelationRow> = response.take(3)?;
1016 let contains_in: Vec<RelationRow> = response.take(4)?;
1017 let returns_out: Vec<RelationRow> = response.take(5)?;
1018 let returns_in: Vec<RelationRow> = response.take(6)?;
1019 let param_types_out: Vec<RelationRow> = response.take(7)?;
1020 let param_types_in: Vec<RelationRow> = response.take(8)?;
1021 let see_also_out: Vec<RelationRow> = response.take(9)?;
1022 let see_also_in: Vec<RelationRow> = response.take(10)?;
1023 let inherits_out: Vec<RelationRow> = response.take(11)?;
1024 let inherits_in: Vec<RelationRow> = response.take(12)?;
1025 let references_out: Vec<RelationRow> = response.take(13)?;
1026 let references_in: Vec<RelationRow> = response.take(14)?;
1027 let observed_in_out: Vec<RelationRow> = response.take(15)?;
1028
1029 let to_records = |rows: Vec<RelationRow>| -> Vec<RelationRecord> {
1030 rows.into_iter().map(RelationRecord::from).collect()
1031 };
1032
1033 Ok(AdjacencyRaw {
1034 member_of: merge_relation_rows(to_records(member_of_out), to_records(member_of_in)),
1035 contains: merge_relation_rows(to_records(contains_out), to_records(contains_in)),
1036 returns: merge_relation_rows(to_records(returns_out), to_records(returns_in)),
1037 param_types: merge_relation_rows(
1038 to_records(param_types_out),
1039 to_records(param_types_in),
1040 ),
1041 see_also: merge_relation_rows(to_records(see_also_out), to_records(see_also_in)),
1042 inherits: merge_relation_rows(to_records(inherits_out), to_records(inherits_in)),
1043 references: merge_relation_rows(to_records(references_out), to_records(references_in)),
1044 observed_in: to_records(observed_in_out),
1045 })
1046 }
1047
1048 pub async fn list_relations_from_doc_block(
1053 &self,
1054 table: &str,
1055 project_id: &str,
1056 doc_block_id: &str,
1057 limit: usize,
1058 ) -> StoreResult<Vec<RelationRecord>> {
1059 self.ensure_schema().await?;
1060 ensure_identifier(table, "table")?;
1061 let project_id = project_id.to_string();
1062 let limit = limit_to_i64(limit)?;
1063 let record_id = RecordId::new(TABLE_DOC_BLOCK, doc_block_id);
1064 let query = format!(
1065 "SELECT id, in AS in_id, out AS out_id, project_id, ingest_id, kind, extra FROM {table} WHERE project_id = $project_id AND in = $record_id LIMIT $limit;"
1066 );
1067 let mut response = self
1068 .db
1069 .query(query)
1070 .bind(("project_id", project_id))
1071 .bind(("record_id", record_id))
1072 .bind(("limit", limit))
1073 .await?;
1074 let records: Vec<RelationRow> = response.take(0)?;
1075 Ok(records.into_iter().map(RelationRecord::from).collect())
1076 }
1077}
1078
1079#[derive(Debug, Default)]
1081pub struct AdjacencyRaw {
1082 pub member_of: Vec<RelationRecord>,
1083 pub contains: Vec<RelationRecord>,
1084 pub returns: Vec<RelationRecord>,
1085 pub param_types: Vec<RelationRecord>,
1086 pub see_also: Vec<RelationRecord>,
1087 pub inherits: Vec<RelationRecord>,
1088 pub references: Vec<RelationRecord>,
1089 pub observed_in: Vec<RelationRecord>,
1090}
1091
1092fn merge_relation_rows(
1093 mut left: Vec<RelationRecord>,
1094 right: Vec<RelationRecord>,
1095) -> Vec<RelationRecord> {
1096 let mut seen = std::collections::HashSet::new();
1097 for r in &left {
1098 seen.insert((r.in_id.clone(), r.out_id.clone(), r.kind.clone()));
1099 }
1100 for r in right {
1101 let key = (r.in_id.clone(), r.out_id.clone(), r.kind.clone());
1102 if seen.insert(key) {
1103 left.push(r);
1104 }
1105 }
1106 left
1107}
1108
1109fn ensure_non_empty(value: &str, field: &str) -> StoreResult<()> {
1110 if value.is_empty() {
1111 return Err(StoreError::InvalidInput(format!("{field} is required")));
1112 }
1113 Ok(())
1114}
1115
1116fn ensure_identifier(value: &str, field: &str) -> StoreResult<()> {
1117 if value.is_empty()
1118 || !value
1119 .chars()
1120 .all(|ch| ch.is_ascii_alphanumeric() || ch == '_')
1121 {
1122 return Err(StoreError::InvalidInput(format!(
1123 "{field} must be a valid identifier"
1124 )));
1125 }
1126 Ok(())
1127}
1128
1129fn parse_record_id(value: &str, field: &str) -> StoreResult<RecordId> {
1130 ensure_non_empty(value, field)?;
1131 RecordId::parse_simple(value).map_err(|err| {
1132 StoreError::InvalidInput(format!(
1133 "{field} must be a record id in 'table:key' format: {err}"
1134 ))
1135 })
1136}
1137
1138#[derive(Debug, Clone, Serialize, SurrealValue)]
1139struct RelationPayload {
1140 project_id: String,
1141 #[serde(skip_serializing_if = "Option::is_none")]
1142 ingest_id: Option<String>,
1143 #[serde(skip_serializing_if = "Option::is_none")]
1144 kind: Option<String>,
1145 #[serde(skip_serializing_if = "Option::is_none")]
1146 extra: Option<Value>,
1147}
1148
1149impl From<&RelationRecord> for RelationPayload {
1150 fn from(value: &RelationRecord) -> Self {
1151 Self {
1152 project_id: value.project_id.clone(),
1153 ingest_id: value.ingest_id.clone(),
1154 kind: value.kind.clone(),
1155 extra: value.extra.clone(),
1156 }
1157 }
1158}
1159
1160#[derive(serde::Deserialize, SurrealValue)]
1161struct RelationRow {
1162 id: RecordId,
1163 in_id: RecordId,
1164 out_id: RecordId,
1165 project_id: String,
1166 ingest_id: Option<String>,
1167 kind: Option<String>,
1168 extra: Option<Value>,
1169}
1170
1171impl From<RelationRow> for RelationRecord {
1172 fn from(row: RelationRow) -> Self {
1173 Self {
1174 id: Some(record_id_to_string(row.id)),
1175 in_id: record_id_to_record_ref(row.in_id),
1176 out_id: record_id_to_record_ref(row.out_id),
1177 project_id: row.project_id,
1178 ingest_id: row.ingest_id,
1179 kind: row.kind,
1180 extra: row.extra,
1181 }
1182 }
1183}
1184
1185#[derive(serde::Deserialize, SurrealValue)]
1186struct IngestRow {
1187 id: RecordId,
1188 project_id: String,
1189 git_commit: Option<String>,
1190 git_branch: Option<String>,
1191 git_tag: Option<String>,
1192 project_version: Option<String>,
1193 source_modified_at: Option<String>,
1194 ingested_at: Option<String>,
1195 extra: Option<serde_json::Value>,
1196}
1197
1198impl From<IngestRow> for Ingest {
1199 fn from(row: IngestRow) -> Self {
1200 Self {
1201 id: Some(record_id_to_string(row.id)),
1202 project_id: row.project_id,
1203 git_commit: row.git_commit,
1204 git_branch: row.git_branch,
1205 git_tag: row.git_tag,
1206 project_version: row.project_version,
1207 source_modified_at: row.source_modified_at,
1208 ingested_at: row.ingested_at,
1209 extra: row.extra,
1210 }
1211 }
1212}
1213
1214#[derive(serde::Deserialize, SurrealValue)]
1215struct DocSourceRow {
1216 id: RecordId,
1217 project_id: String,
1218 ingest_id: Option<String>,
1219 language: Option<String>,
1220 source_kind: Option<String>,
1221 path: Option<String>,
1222 tool_version: Option<String>,
1223 hash: Option<String>,
1224 source_modified_at: Option<String>,
1225 extra: Option<serde_json::Value>,
1226}
1227
1228impl From<DocSourceRow> for DocSource {
1229 fn from(row: DocSourceRow) -> Self {
1230 Self {
1231 id: Some(record_id_to_string(row.id)),
1232 project_id: row.project_id,
1233 ingest_id: row.ingest_id,
1234 language: row.language,
1235 source_kind: row.source_kind,
1236 path: row.path,
1237 tool_version: row.tool_version,
1238 hash: row.hash,
1239 source_modified_at: row.source_modified_at,
1240 extra: row.extra,
1241 }
1242 }
1243}
1244
1245#[derive(serde::Deserialize, SurrealValue)]
1246struct SymbolKindRow {
1247 kind: Option<String>,
1248}
1249
1250#[derive(serde::Deserialize, SurrealValue)]
1251struct CountRow {
1252 count: i64,
1253}
1254
1255#[derive(serde::Deserialize, SurrealValue)]
1256struct DocBlockSymbolKeyRow {
1257 symbol_key: String,
1258}
1259
1260#[derive(serde::Deserialize, SurrealValue)]
1261struct ObservedInSymbolRow {
1262 symbol_id: RecordId,
1263}
1264
1265async fn apply_schema<C: Connection>(db: &Surreal<C>, schema: &str) -> StoreResult<()> {
1266 db.query(schema).await?.check()?;
1267 Ok(())
1268}
1269
1270fn split_optional_doc_block_fts_schema(schema: &str) -> StoreResult<(String, Option<String>)> {
1271 let mut required_schema = String::new();
1272 let mut optional_schema = String::new();
1273 let mut in_optional_block = false;
1274 let mut found_optional_start = false;
1275 let mut found_optional_end = false;
1276
1277 for line in schema.lines() {
1278 let trimmed = line.trim();
1279 if trimmed == OPTIONAL_DOC_BLOCK_FTS_START {
1280 if in_optional_block || found_optional_start {
1281 return Err(StoreError::InvalidInput(
1282 "schema optional FTS block start marker appears multiple times".to_string(),
1283 ));
1284 }
1285 found_optional_start = true;
1286 in_optional_block = true;
1287 continue;
1288 }
1289 if trimmed == OPTIONAL_DOC_BLOCK_FTS_END {
1290 if !in_optional_block {
1291 return Err(StoreError::InvalidInput(
1292 "schema optional FTS block end marker appears before start".to_string(),
1293 ));
1294 }
1295 found_optional_end = true;
1296 in_optional_block = false;
1297 continue;
1298 }
1299 if in_optional_block {
1300 optional_schema.push_str(line);
1301 optional_schema.push('\n');
1302 } else {
1303 required_schema.push_str(line);
1304 required_schema.push('\n');
1305 }
1306 }
1307
1308 if in_optional_block {
1309 return Err(StoreError::InvalidInput(
1310 "schema optional FTS block start marker is missing a matching end marker".to_string(),
1311 ));
1312 }
1313 if found_optional_start != found_optional_end {
1314 return Err(StoreError::InvalidInput(
1315 "schema optional FTS block markers are unbalanced".to_string(),
1316 ));
1317 }
1318 if !found_optional_start {
1319 return Ok((required_schema, None));
1320 }
1321
1322 let optional_schema = optional_schema.trim();
1323 if optional_schema.is_empty() {
1324 return Err(StoreError::InvalidInput(
1325 "schema optional FTS block is empty".to_string(),
1326 ));
1327 }
1328 Ok((required_schema, Some(optional_schema.to_string())))
1329}
1330
1331fn normalize_pattern(pattern: &str) -> Option<String> {
1332 let trimmed = pattern.trim().to_lowercase();
1333 if trimmed.is_empty() {
1334 None
1335 } else {
1336 Some(trimmed)
1337 }
1338}
1339
1340fn limit_to_i64(limit: usize) -> StoreResult<i64> {
1341 i64::try_from(limit)
1342 .map_err(|_| StoreError::InvalidInput("limit exceeds supported range".to_string()))
1343}
1344
1345fn record_id_to_string(record_id: RecordId) -> String {
1346 record_id_key_to_string(record_id.key)
1347}
1348
1349fn record_id_to_record_ref(record_id: RecordId) -> String {
1350 let table = record_id.table.into_string();
1351 let key = record_id_key_to_string(record_id.key);
1352 format!("{table}:{key}")
1353}
1354
1355fn record_id_key_to_string(key: RecordIdKey) -> String {
1356 match key {
1357 RecordIdKey::String(value) => value,
1358 other => other.to_sql(),
1359 }
1360}
1361
1362fn build_project_regex(pattern: &str) -> StoreResult<Regex> {
1363 let body = glob_to_regex_body(pattern);
1364 let regex = format!(r"(^|\|){body}(\||$)");
1365 Regex::from_str(®ex)
1366 .map_err(|err| StoreError::InvalidInput(format!("Invalid project search pattern: {err}")))
1367}
1368
1369fn build_scope_regex(pattern: &str) -> StoreResult<Regex> {
1370 let body = glob_to_regex_body(pattern);
1371 let regex = format!(r"^{body}$");
1372 Regex::from_str(®ex)
1373 .map_err(|err| StoreError::InvalidInput(format!("Invalid scope search pattern: {err}")))
1374}
1375
1376fn glob_to_regex_body(pattern: &str) -> String {
1377 let mut escaped = String::new();
1378 for ch in pattern.chars() {
1379 match ch {
1380 '*' => escaped.push_str(".*"),
1381 '.' | '+' | '?' | '(' | ')' | '[' | ']' | '{' | '}' | '|' | '^' | '$' | '\\' => {
1382 escaped.push('\\');
1383 escaped.push(ch);
1384 }
1385 _ => escaped.push(ch),
1386 }
1387 }
1388 escaped
1389}
1390
1391fn make_scoped_ingest_id(project_id: &str, ingest_id: &str) -> String {
1392 let prefix = format!("{project_id}::");
1393 if ingest_id.starts_with(prefix.as_str()) {
1394 ingest_id.to_string()
1395 } else {
1396 format!("{project_id}::{ingest_id}")
1397 }
1398}
1399
1400fn normalize_ingest_filter_ids(project_id: &str, ingest_ids: &[String]) -> Vec<String> {
1401 let scoped_prefix = format!("{project_id}::");
1402 let mut unique = HashSet::new();
1403 let mut normalized = Vec::new();
1404 for ingest_id in ingest_ids {
1405 let trimmed = ingest_id.trim();
1406 if trimmed.is_empty() {
1407 continue;
1408 }
1409 if unique.insert(trimmed.to_string()) {
1410 normalized.push(trimmed.to_string());
1411 }
1412 if let Some(stripped) = trimmed.strip_prefix(scoped_prefix.as_str())
1413 && !stripped.is_empty()
1414 && unique.insert(stripped.to_string())
1415 {
1416 normalized.push(stripped.to_string());
1417 }
1418 }
1419 normalized
1420}
1421
1422fn merge_ingest_extra(existing: Option<Value>, requested_ingest_id: &str) -> Value {
1423 let mut object = match existing {
1424 Some(Value::Object(map)) => map,
1425 Some(value) => {
1426 let mut map = serde_json::Map::new();
1427 map.insert("value".to_string(), value);
1428 map
1429 }
1430 None => serde_json::Map::new(),
1431 };
1432 object.insert(
1433 "requested_ingest_id".to_string(),
1434 Value::String(requested_ingest_id.to_string()),
1435 );
1436 Value::Object(object)
1437}
1438
1439#[cfg(test)]
1440mod tests {
1441 use super::*;
1442 use docx_store::models::{DocSource, Ingest, Project, RelationRecord, Symbol};
1443 use docx_store::schema::REL_MEMBER_OF;
1444 use serde::Deserialize;
1445 use surrealdb::Surreal;
1446 use surrealdb::engine::local::{Db, Mem};
1447
1448 async fn build_store() -> SurrealDocStore<Db> {
1449 let db = Surreal::new::<Mem>(())
1450 .await
1451 .expect("failed to create in-memory SurrealDB");
1452 db.use_ns("docx")
1453 .use_db("test")
1454 .await
1455 .expect("failed to set namespace/db");
1456 SurrealDocStore::new(db)
1457 }
1458
1459 #[test]
1460 fn split_optional_doc_block_fts_schema_extracts_optional_block() {
1461 let schema = "\
1462DEFINE TABLE test SCHEMAFULL;\n\
1463-- OPTIONAL_DOC_BLOCK_FTS_START\n\
1464DEFINE ANALYZER IF NOT EXISTS docx_search TOKENIZERS blank,class FILTERS lowercase,snowball(english);\n\
1465-- OPTIONAL_DOC_BLOCK_FTS_END\n\
1466DEFINE INDEX test_idx ON TABLE test COLUMNS id;\n";
1467 let (required, optional) =
1468 split_optional_doc_block_fts_schema(schema).expect("split should succeed");
1469 let optional = optional.expect("optional block should be extracted");
1470 assert!(required.contains("DEFINE TABLE test SCHEMAFULL;"));
1471 assert!(required.contains("DEFINE INDEX test_idx ON TABLE test COLUMNS id;"));
1472 assert!(!required.contains("DEFINE ANALYZER"));
1473 assert!(optional.contains("DEFINE ANALYZER IF NOT EXISTS docx_search"));
1474 }
1475
1476 #[test]
1477 fn split_optional_doc_block_fts_schema_rejects_unclosed_optional_block() {
1478 let schema = "\
1479DEFINE TABLE test SCHEMAFULL;\n\
1480-- OPTIONAL_DOC_BLOCK_FTS_START\n\
1481DEFINE ANALYZER IF NOT EXISTS docx_search TOKENIZERS blank,class FILTERS lowercase,snowball(english);\n";
1482 let error = split_optional_doc_block_fts_schema(schema)
1483 .expect_err("unclosed optional block should fail");
1484 assert!(
1485 error
1486 .to_string()
1487 .contains("start marker is missing a matching end marker"),
1488 "error should describe unmatched optional block markers"
1489 );
1490 }
1491
1492 #[tokio::test]
1493 async fn list_ingests_includes_ids() {
1494 let store = build_store().await;
1495 let ingest = Ingest {
1496 id: Some("ingest-1".to_string()),
1497 project_id: "project".to_string(),
1498 git_commit: None,
1499 git_branch: None,
1500 git_tag: None,
1501 project_version: None,
1502 source_modified_at: None,
1503 ingested_at: None,
1504 extra: None,
1505 };
1506
1507 store
1508 .create_ingest(ingest)
1509 .await
1510 .expect("failed to create ingest");
1511 let ingests = store
1512 .list_ingests("project", 10)
1513 .await
1514 .expect("failed to list ingests");
1515
1516 assert_eq!(ingests.len(), 1);
1517 assert_eq!(ingests[0].id.as_deref(), Some("project::ingest-1"));
1518 assert_eq!(
1519 ingests[0]
1520 .extra
1521 .as_ref()
1522 .and_then(|value| value.get("requested_ingest_id"))
1523 .and_then(serde_json::Value::as_str),
1524 Some("ingest-1")
1525 );
1526 }
1527
1528 #[tokio::test]
1529 async fn list_ingests_scopes_same_ingest_id_per_project() {
1530 let store = build_store().await;
1531 let left = Ingest {
1532 id: Some("shared".to_string()),
1533 project_id: "project-left".to_string(),
1534 git_commit: None,
1535 git_branch: None,
1536 git_tag: None,
1537 project_version: None,
1538 source_modified_at: None,
1539 ingested_at: None,
1540 extra: None,
1541 };
1542 let right = Ingest {
1543 id: Some("shared".to_string()),
1544 project_id: "project-right".to_string(),
1545 git_commit: None,
1546 git_branch: None,
1547 git_tag: None,
1548 project_version: None,
1549 source_modified_at: None,
1550 ingested_at: None,
1551 extra: None,
1552 };
1553
1554 store
1555 .create_ingest(left)
1556 .await
1557 .expect("failed to create left ingest");
1558 store
1559 .create_ingest(right)
1560 .await
1561 .expect("failed to create right ingest");
1562
1563 let left_ingests = store
1564 .list_ingests("project-left", 10)
1565 .await
1566 .expect("failed to list left ingests");
1567 let right_ingests = store
1568 .list_ingests("project-right", 10)
1569 .await
1570 .expect("failed to list right ingests");
1571
1572 assert_eq!(left_ingests.len(), 1);
1573 assert_eq!(right_ingests.len(), 1);
1574 assert_eq!(left_ingests[0].id.as_deref(), Some("project-left::shared"));
1575 assert_eq!(
1576 right_ingests[0].id.as_deref(),
1577 Some("project-right::shared")
1578 );
1579 }
1580
1581 #[tokio::test]
1582 async fn get_ingest_supports_requested_id_when_unique() {
1583 let store = build_store().await;
1584 let ingest = Ingest {
1585 id: Some("requested".to_string()),
1586 project_id: "project".to_string(),
1587 git_commit: None,
1588 git_branch: None,
1589 git_tag: None,
1590 project_version: None,
1591 source_modified_at: None,
1592 ingested_at: None,
1593 extra: None,
1594 };
1595
1596 store
1597 .create_ingest(ingest)
1598 .await
1599 .expect("failed to create ingest");
1600 let found = store
1601 .get_ingest("requested")
1602 .await
1603 .expect("failed to lookup ingest by requested id");
1604 assert_eq!(
1605 found.and_then(|row| row.id),
1606 Some("project::requested".to_string())
1607 );
1608 }
1609
1610 #[tokio::test]
1611 async fn get_ingest_rejects_ambiguous_requested_id() {
1612 let store = build_store().await;
1613 let left = Ingest {
1614 id: Some("requested".to_string()),
1615 project_id: "project-left".to_string(),
1616 git_commit: None,
1617 git_branch: None,
1618 git_tag: None,
1619 project_version: None,
1620 source_modified_at: None,
1621 ingested_at: None,
1622 extra: None,
1623 };
1624 let right = Ingest {
1625 id: Some("requested".to_string()),
1626 project_id: "project-right".to_string(),
1627 git_commit: None,
1628 git_branch: None,
1629 git_tag: None,
1630 project_version: None,
1631 source_modified_at: None,
1632 ingested_at: None,
1633 extra: None,
1634 };
1635
1636 store
1637 .create_ingest(left)
1638 .await
1639 .expect("failed to create left ingest");
1640 store
1641 .create_ingest(right)
1642 .await
1643 .expect("failed to create right ingest");
1644
1645 let error = store
1646 .get_ingest("requested")
1647 .await
1648 .expect_err("ambiguous requested id should error");
1649 assert!(
1650 error
1651 .to_string()
1652 .contains("ambiguous; use project-scoped ingest id"),
1653 "error should explain scoped ingest id requirement"
1654 );
1655 }
1656
1657 #[tokio::test]
1658 async fn list_doc_sources_includes_ids() {
1659 let store = build_store().await;
1660 let source = DocSource {
1661 id: Some("source-1".to_string()),
1662 project_id: "project".to_string(),
1663 ingest_id: Some("ingest-1".to_string()),
1664 language: None,
1665 source_kind: None,
1666 path: None,
1667 tool_version: None,
1668 hash: None,
1669 source_modified_at: None,
1670 extra: None,
1671 };
1672
1673 store
1674 .create_doc_source(source)
1675 .await
1676 .expect("failed to create doc source");
1677 let sources = store
1678 .list_doc_sources_by_project("project", None, 10)
1679 .await
1680 .expect("failed to list doc sources");
1681
1682 assert_eq!(sources.len(), 1);
1683 assert_eq!(sources[0].id.as_deref(), Some("source-1"));
1684 }
1685
1686 #[tokio::test]
1687 async fn list_doc_sources_accepts_scoped_ingest_filter() {
1688 let store = build_store().await;
1689 let source = DocSource {
1690 id: Some("source-1".to_string()),
1691 project_id: "project".to_string(),
1692 ingest_id: Some("shared".to_string()),
1693 language: None,
1694 source_kind: None,
1695 path: None,
1696 tool_version: None,
1697 hash: None,
1698 source_modified_at: None,
1699 extra: None,
1700 };
1701
1702 store
1703 .create_doc_source(source)
1704 .await
1705 .expect("failed to create doc source");
1706 let sources = store
1707 .list_doc_sources("project", &["project::shared".to_string()])
1708 .await
1709 .expect("failed to list doc sources with scoped ingest id");
1710
1711 assert_eq!(sources.len(), 1);
1712 assert_eq!(sources[0].id.as_deref(), Some("source-1"));
1713 }
1714
1715 #[tokio::test]
1716 async fn list_doc_sources_by_project_accepts_scoped_ingest_filter() {
1717 let store = build_store().await;
1718 let source = DocSource {
1719 id: Some("source-1".to_string()),
1720 project_id: "project".to_string(),
1721 ingest_id: Some("shared".to_string()),
1722 language: None,
1723 source_kind: None,
1724 path: None,
1725 tool_version: None,
1726 hash: None,
1727 source_modified_at: None,
1728 extra: None,
1729 };
1730
1731 store
1732 .create_doc_source(source)
1733 .await
1734 .expect("failed to create doc source");
1735 let sources = store
1736 .list_doc_sources_by_project("project", Some("project::shared"), 10)
1737 .await
1738 .expect("failed to list doc sources by project with scoped ingest id");
1739
1740 assert_eq!(sources.len(), 1);
1741 assert_eq!(sources[0].id.as_deref(), Some("source-1"));
1742 }
1743
1744 #[tokio::test]
1745 async fn list_doc_sources_by_ids_filters_requested_ids() {
1746 let store = build_store().await;
1747 let source_left = DocSource {
1748 id: Some("source-left".to_string()),
1749 project_id: "project".to_string(),
1750 ingest_id: Some("ingest-1".to_string()),
1751 language: None,
1752 source_kind: None,
1753 path: None,
1754 tool_version: None,
1755 hash: None,
1756 source_modified_at: None,
1757 extra: None,
1758 };
1759 let source_right = DocSource {
1760 id: Some("source-right".to_string()),
1761 project_id: "project".to_string(),
1762 ingest_id: Some("ingest-1".to_string()),
1763 language: None,
1764 source_kind: None,
1765 path: None,
1766 tool_version: None,
1767 hash: None,
1768 source_modified_at: None,
1769 extra: None,
1770 };
1771
1772 store
1773 .create_doc_source(source_left)
1774 .await
1775 .expect("failed to create left doc source");
1776 store
1777 .create_doc_source(source_right)
1778 .await
1779 .expect("failed to create right doc source");
1780
1781 let results = store
1782 .list_doc_sources_by_ids("project", &["source-right".to_string()])
1783 .await
1784 .expect("failed to list doc sources by ids");
1785 assert_eq!(results.len(), 1);
1786 assert_eq!(results[0].id.as_deref(), Some("source-right"));
1787 }
1788
1789 fn build_symbol(project_id: &str, id: &str) -> Symbol {
1790 Symbol {
1791 id: Some(id.to_string()),
1792 project_id: project_id.to_string(),
1793 language: Some("rust".to_string()),
1794 symbol_key: id.to_string(),
1795 kind: None,
1796 name: None,
1797 qualified_name: None,
1798 display_name: None,
1799 signature: None,
1800 signature_hash: None,
1801 visibility: None,
1802 is_static: None,
1803 is_async: None,
1804 is_const: None,
1805 is_deprecated: None,
1806 since: None,
1807 stability: None,
1808 source_path: None,
1809 line: None,
1810 col: None,
1811 return_type: None,
1812 params: Vec::new(),
1813 type_params: Vec::new(),
1814 attributes: Vec::new(),
1815 source_ids: Vec::new(),
1816 doc_summary: None,
1817 extra: None,
1818 }
1819 }
1820
1821 #[derive(Deserialize, SurrealValue)]
1822 struct RelationTypeFlags {
1823 in_is_record: bool,
1824 out_is_record: bool,
1825 }
1826
1827 #[tokio::test]
1828 async fn create_relation_stores_record_links() {
1829 let store = build_store().await;
1830 let _ = store
1831 .upsert_symbol(build_symbol("project", "left"))
1832 .await
1833 .expect("failed to create left symbol");
1834 let _ = store
1835 .upsert_symbol(build_symbol("project", "right"))
1836 .await
1837 .expect("failed to create right symbol");
1838
1839 let relation = RelationRecord {
1840 id: None,
1841 in_id: "symbol:left".to_string(),
1842 out_id: "symbol:right".to_string(),
1843 project_id: "project".to_string(),
1844 ingest_id: None,
1845 kind: Some("test".to_string()),
1846 extra: None,
1847 };
1848 let _ = store
1849 .create_relation(REL_MEMBER_OF, relation)
1850 .await
1851 .expect("failed to create relation");
1852
1853 let mut response = store
1854 .db()
1855 .query(
1856 "SELECT type::is_record(in) AS in_is_record, type::is_record(out) AS out_is_record FROM member_of LIMIT 1;",
1857 )
1858 .await
1859 .expect("failed to query relation record types");
1860 let rows: Vec<RelationTypeFlags> = response.take(0).expect("failed to decode relation row");
1861 assert_eq!(rows.len(), 1);
1862 assert!(rows[0].in_is_record);
1863 assert!(rows[0].out_is_record);
1864 }
1865
1866 #[tokio::test]
1867 async fn search_symbols_advanced_supports_exact_symbol_key() {
1868 let store = build_store().await;
1869 let mut alpha = build_symbol("project", "rust|project|alpha");
1870 alpha.name = Some("alpha".to_string());
1871 alpha.qualified_name = Some("crate::alpha".to_string());
1872 let mut beta = build_symbol("project", "rust|project|beta");
1873 beta.name = Some("beta".to_string());
1874 beta.qualified_name = Some("crate::beta".to_string());
1875
1876 store
1877 .upsert_symbol(alpha.clone())
1878 .await
1879 .expect("failed to create alpha");
1880 store
1881 .upsert_symbol(beta)
1882 .await
1883 .expect("failed to create beta");
1884
1885 let results = store
1886 .search_symbols_advanced(
1887 "project",
1888 None,
1889 None,
1890 Some(alpha.symbol_key.as_str()),
1891 None,
1892 10,
1893 )
1894 .await
1895 .expect("advanced search by symbol key should succeed");
1896 assert_eq!(results.len(), 1);
1897 assert_eq!(results[0].symbol_key, alpha.symbol_key);
1898 }
1899
1900 #[tokio::test]
1901 async fn remove_database_makes_current_db_unavailable() {
1902 let store = build_store().await;
1903 let _ = store
1904 .upsert_project(Project {
1905 id: Some("project".to_string()),
1906 project_id: "project".to_string(),
1907 name: Some("project".to_string()),
1908 language: Some("rust".to_string()),
1909 root_path: None,
1910 description: None,
1911 aliases: Vec::new(),
1912 search_text: Some("project".to_string()),
1913 extra: None,
1914 })
1915 .await
1916 .expect("failed to upsert project");
1917
1918 store
1919 .remove_database("test")
1920 .await
1921 .expect("failed to remove database");
1922 let result = store.list_projects(10).await;
1923 assert!(result.is_err());
1924 }
1925}