Skip to main content

docx_core/store/
surreal.rs

1use std::{collections::HashSet, error::Error, fmt, str::FromStr, sync::Arc};
2
3use docx_store::models::{DocBlock, DocChunk, DocSource, Ingest, Project, RelationRecord, Symbol};
4use docx_store::schema::{
5    SCHEMA_BOOTSTRAP_SURQL, TABLE_DOC_BLOCK, TABLE_DOC_SOURCE, TABLE_INGEST, TABLE_PROJECT,
6    TABLE_SYMBOL,
7};
8use serde::Serialize;
9use serde_json::Value;
10use surrealdb::types::{RecordId, RecordIdKey, Regex, SurrealValue, Table, ToSql};
11use surrealdb::{Connection, Surreal};
12use tracing::warn;
13use uuid::Uuid;
14
15/// Errors returned by the `SurrealDB` store implementation.
16#[derive(Debug)]
17pub enum StoreError {
18    Surreal(Box<surrealdb::Error>),
19    InvalidInput(String),
20}
21
22impl fmt::Display for StoreError {
23    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
24        match self {
25            Self::Surreal(err) => write!(f, "SurrealDB error: {err}"),
26            Self::InvalidInput(message) => write!(f, "Invalid input: {message}"),
27        }
28    }
29}
30
31impl Error for StoreError {}
32
33impl From<surrealdb::Error> for StoreError {
34    fn from(err: surrealdb::Error) -> Self {
35        Self::Surreal(Box::new(err))
36    }
37}
38
39pub type StoreResult<T> = Result<T, StoreError>;
40
41const OPTIONAL_DOC_BLOCK_FTS_START: &str = "-- OPTIONAL_DOC_BLOCK_FTS_START";
42const OPTIONAL_DOC_BLOCK_FTS_END: &str = "-- OPTIONAL_DOC_BLOCK_FTS_END";
43
44/// Store implementation backed by `SurrealDB`.
45pub struct SurrealDocStore<C: Connection> {
46    db: Arc<Surreal<C>>,
47    schema_ready: Arc<tokio::sync::OnceCell<()>>,
48}
49
50impl<C: Connection> Clone for SurrealDocStore<C> {
51    fn clone(&self) -> Self {
52        Self {
53            db: self.db.clone(),
54            schema_ready: self.schema_ready.clone(),
55        }
56    }
57}
58
59impl<C: Connection> SurrealDocStore<C> {
60    #[must_use]
61    pub fn new(db: Surreal<C>) -> Self {
62        Self {
63            db: Arc::new(db),
64            schema_ready: Arc::new(tokio::sync::OnceCell::new()),
65        }
66    }
67
68    #[must_use]
69    pub fn from_arc(db: Arc<Surreal<C>>) -> Self {
70        Self {
71            db,
72            schema_ready: Arc::new(tokio::sync::OnceCell::new()),
73        }
74    }
75
76    #[must_use]
77    pub fn db(&self) -> &Surreal<C> {
78        &self.db
79    }
80
81    async fn ensure_schema(&self) -> StoreResult<()> {
82        self.schema_ready
83            .get_or_try_init(|| async {
84                let (required_schema, optional_doc_block_fts) =
85                    split_optional_doc_block_fts_schema(SCHEMA_BOOTSTRAP_SURQL)?;
86                apply_schema(self.db.as_ref(), required_schema.as_str()).await?;
87                if let Some(optional_doc_block_fts) = optional_doc_block_fts
88                    && let Err(error) =
89                        apply_schema(self.db.as_ref(), optional_doc_block_fts.as_str()).await
90                {
91                    warn!(
92                        error = %error,
93                        "optional doc_block full-text schema was skipped"
94                    );
95                }
96                Ok::<(), StoreError>(())
97            })
98            .await?;
99        Ok(())
100    }
101
102    /// Upserts a project record by id.
103    ///
104    /// # Errors
105    /// Returns `StoreError` if validation fails or the database write fails.
106    pub async fn upsert_project(&self, mut project: Project) -> StoreResult<Project> {
107        self.ensure_schema().await?;
108        ensure_non_empty(&project.project_id, "project_id")?;
109        let id = project
110            .id
111            .clone()
112            .unwrap_or_else(|| project.project_id.clone());
113        project.id = Some(id.clone());
114        let record = RecordId::new(TABLE_PROJECT, id.as_str());
115        self.db
116            .query("UPSERT $record CONTENT $data RETURN NONE;")
117            .bind(("record", record))
118            .bind(("data", project.clone()))
119            .await?
120            .check()?;
121        Ok(project)
122    }
123
124    /// Fetches a project by id.
125    ///
126    /// # Errors
127    /// Returns `StoreError` if the database query fails.
128    pub async fn get_project(&self, project_id: &str) -> StoreResult<Option<Project>> {
129        self.ensure_schema().await?;
130        let record = RecordId::new(TABLE_PROJECT, project_id);
131        let mut response = self
132            .db
133            .query("SELECT *, record::id(id) AS id FROM $record;")
134            .bind(("record", record))
135            .await?;
136        let mut records: Vec<Project> = response.take(0)?;
137        Ok(records.pop())
138    }
139
140    /// Fetches an ingest record by id.
141    ///
142    /// # Errors
143    /// Returns `StoreError` if the database query fails.
144    pub async fn get_ingest(&self, ingest_id: &str) -> StoreResult<Option<Ingest>> {
145        self.ensure_schema().await?;
146        let record = RecordId::new(TABLE_INGEST, ingest_id);
147        let mut response = self
148            .db
149            .query("SELECT * FROM $record;")
150            .bind(("record", record))
151            .await?;
152        let records: Vec<IngestRow> = response.take(0)?;
153        if let Some(ingest) = records.into_iter().next().map(Ingest::from) {
154            return Ok(Some(ingest));
155        }
156        if ingest_id.contains("::") {
157            return Ok(None);
158        }
159
160        let mut response = self
161            .db
162            .query("SELECT * FROM ingest WHERE extra.requested_ingest_id = $requested_id;")
163            .bind(("requested_id", ingest_id.to_string()))
164            .await?;
165        let records: Vec<IngestRow> = response.take(0)?;
166        let mut rows = records.into_iter();
167        let first = rows.next();
168        if rows.next().is_some() {
169            return Err(StoreError::InvalidInput(format!(
170                "ingest_id '{ingest_id}' is ambiguous; use project-scoped ingest id"
171            )));
172        }
173        Ok(first.map(Ingest::from))
174    }
175
176    /// Lists projects up to the provided limit.
177    ///
178    /// # Errors
179    /// Returns `StoreError` if the limit is invalid or the database query fails.
180    pub async fn list_projects(&self, limit: usize) -> StoreResult<Vec<Project>> {
181        self.ensure_schema().await?;
182        let limit = limit_to_i64(limit)?;
183        let query = "SELECT *, record::id(id) AS id FROM project LIMIT $limit;";
184        let mut response = self.db.query(query).bind(("limit", limit)).await?;
185        let records: Vec<Project> = response.take(0)?;
186        Ok(records)
187    }
188
189    /// Searches projects by name or alias pattern.
190    ///
191    /// # Errors
192    /// Returns `StoreError` if the limit or pattern is invalid or the database query fails.
193    pub async fn search_projects(&self, pattern: &str, limit: usize) -> StoreResult<Vec<Project>> {
194        self.ensure_schema().await?;
195        let Some(pattern) = normalize_pattern(pattern) else {
196            return self.list_projects(limit).await;
197        };
198        let limit = limit_to_i64(limit)?;
199        let regex = build_project_regex(&pattern)?;
200        let query = format!(
201            "SELECT *, record::id(id) AS id FROM project WHERE search_text != NONE AND string::matches(search_text, {}) LIMIT $limit;",
202            regex.to_sql()
203        );
204        let mut response = self.db.query(query).bind(("limit", limit)).await?;
205        let records: Vec<Project> = response.take(0)?;
206        Ok(records)
207    }
208
209    /// Lists ingest records for a project.
210    ///
211    /// # Errors
212    /// Returns `StoreError` if the limit is invalid or the database query fails.
213    pub async fn list_ingests(&self, project_id: &str, limit: usize) -> StoreResult<Vec<Ingest>> {
214        self.ensure_schema().await?;
215        let project_id = project_id.to_string();
216        let limit = limit_to_i64(limit)?;
217        let query = "SELECT * FROM ingest WHERE project_id = $project_id ORDER BY ingested_at DESC LIMIT $limit;";
218        let mut response = self
219            .db
220            .query(query)
221            .bind(("project_id", project_id))
222            .bind(("limit", limit))
223            .await?;
224        let records: Vec<IngestRow> = response.take(0)?;
225        Ok(records.into_iter().map(Ingest::from).collect())
226    }
227
228    /// Creates an ingest record.
229    ///
230    /// # Errors
231    /// Returns `StoreError` if the database write fails.
232    pub async fn create_ingest(&self, mut ingest: Ingest) -> StoreResult<Ingest> {
233        self.ensure_schema().await?;
234        let provided_id = ingest.id.clone();
235        let id = provided_id.as_ref().map_or_else(
236            || Uuid::new_v4().to_string(),
237            |value| make_scoped_ingest_id(&ingest.project_id, value),
238        );
239        if let Some(provided_id) = provided_id
240            && provided_id != id
241        {
242            ingest.extra = Some(merge_ingest_extra(ingest.extra.take(), &provided_id));
243        }
244        ingest.id = Some(id.clone());
245        let record = RecordId::new(TABLE_INGEST, id.as_str());
246        self.db
247            .query("UPSERT $record CONTENT $data RETURN NONE;")
248            .bind(("record", record))
249            .bind(("data", ingest.clone()))
250            .await?
251            .check()?;
252        Ok(ingest)
253    }
254
255    /// Creates a document source record.
256    ///
257    /// # Errors
258    /// Returns `StoreError` if the database write fails.
259    pub async fn create_doc_source(&self, mut source: DocSource) -> StoreResult<DocSource> {
260        self.ensure_schema().await?;
261        let id = source
262            .id
263            .clone()
264            .unwrap_or_else(|| Uuid::new_v4().to_string());
265        source.id = Some(id.clone());
266        self.db
267            .query("CREATE doc_source CONTENT $data RETURN NONE;")
268            .bind(("data", source.clone()))
269            .await?
270            .check()?;
271        Ok(source)
272    }
273
274    /// Upserts a symbol record by symbol key.
275    ///
276    /// # Errors
277    /// Returns `StoreError` if validation fails or the database write fails.
278    pub async fn upsert_symbol(&self, mut symbol: Symbol) -> StoreResult<Symbol> {
279        self.ensure_schema().await?;
280        ensure_non_empty(&symbol.symbol_key, "symbol_key")?;
281        let id = symbol
282            .id
283            .clone()
284            .unwrap_or_else(|| symbol.symbol_key.clone());
285        symbol.id = Some(id.clone());
286        let record = RecordId::new(TABLE_SYMBOL, id.as_str());
287        self.db
288            .query("UPSERT $record CONTENT $data RETURN NONE;")
289            .bind(("record", record))
290            .bind(("data", symbol.clone()))
291            .await?
292            .check()?;
293        Ok(symbol)
294    }
295
296    /// Creates a document block record.
297    ///
298    /// # Errors
299    /// Returns `StoreError` if the database write fails.
300    pub async fn create_doc_block(&self, mut block: DocBlock) -> StoreResult<DocBlock> {
301        self.ensure_schema().await?;
302        let id = block
303            .id
304            .clone()
305            .unwrap_or_else(|| Uuid::new_v4().to_string());
306        block.id = Some(id.clone());
307        self.db
308            .query("CREATE doc_block CONTENT $data RETURN NONE;")
309            .bind(("data", block.clone()))
310            .await?
311            .check()?;
312        Ok(block)
313    }
314
315    /// Creates document block records concurrently.
316    ///
317    /// # Errors
318    /// Returns `StoreError` if the database write fails.
319    pub async fn create_doc_blocks(&self, blocks: Vec<DocBlock>) -> StoreResult<Vec<DocBlock>> {
320        self.ensure_schema().await?;
321        if blocks.is_empty() {
322            return Ok(Vec::new());
323        }
324        let futs: Vec<_> = blocks
325            .into_iter()
326            .map(|block| self.create_doc_block(block))
327            .collect();
328        let results = futures::future::join_all(futs).await;
329        results.into_iter().collect()
330    }
331
332    /// Creates document chunk records.
333    ///
334    /// # Errors
335    /// Returns `StoreError` if the database write fails.
336    pub async fn create_doc_chunks(&self, chunks: Vec<DocChunk>) -> StoreResult<Vec<DocChunk>> {
337        self.ensure_schema().await?;
338        if chunks.is_empty() {
339            return Ok(Vec::new());
340        }
341        let mut stored = Vec::with_capacity(chunks.len());
342        for mut chunk in chunks {
343            let id = chunk
344                .id
345                .clone()
346                .unwrap_or_else(|| Uuid::new_v4().to_string());
347            chunk.id = Some(id.clone());
348            self.db
349                .query("CREATE doc_chunk CONTENT $data RETURN NONE;")
350                .bind(("data", chunk.clone()))
351                .await?
352                .check()?;
353            stored.push(chunk);
354        }
355        Ok(stored)
356    }
357
358    /// Creates a relation record in the specified table.
359    ///
360    /// # Errors
361    /// Returns `StoreError` if the database write fails.
362    pub async fn create_relation(
363        &self,
364        table: &str,
365        relation: RelationRecord,
366    ) -> StoreResult<RelationRecord> {
367        self.ensure_schema().await?;
368        ensure_identifier(table, "table")?;
369        let in_id = parse_record_id(&relation.in_id, "in_id")?;
370        let out_id = parse_record_id(&relation.out_id, "out_id")?;
371        let payload = RelationPayload::from(&relation);
372        let statement = format!("RELATE $in->{table}->$out CONTENT $data RETURN NONE;");
373        self.db
374            .query(statement)
375            .bind(("in", in_id))
376            .bind(("out", out_id))
377            .bind(("data", payload))
378            .await?
379            .check()?;
380        Ok(relation)
381    }
382
383    /// Creates relation records in the specified table concurrently.
384    ///
385    /// # Errors
386    /// Returns `StoreError` if the database write fails.
387    pub async fn create_relations(
388        &self,
389        table: &str,
390        relations: Vec<RelationRecord>,
391    ) -> StoreResult<Vec<RelationRecord>> {
392        if relations.is_empty() {
393            return Ok(Vec::new());
394        }
395        let futs: Vec<_> = relations
396            .into_iter()
397            .map(|r| self.create_relation(table, r))
398            .collect();
399        let results = futures::future::join_all(futs).await;
400        results.into_iter().collect()
401    }
402
403    /// Lists all database names in the current namespace.
404    ///
405    /// # Errors
406    /// Returns `StoreError` if the query fails.
407    pub async fn list_databases(&self) -> StoreResult<Vec<String>> {
408        let mut response = self.db.query("INFO FOR NS;").await?;
409        let info: Option<Value> = response.take(0)?;
410        let names = info
411            .and_then(|v| v.get("databases").cloned())
412            .and_then(|v| {
413                v.as_object()
414                    .map(|obj| obj.keys().cloned().collect::<Vec<_>>())
415            })
416            .unwrap_or_default();
417        Ok(names)
418    }
419
420    /// Removes a database in the current namespace.
421    ///
422    /// # Errors
423    /// Returns `StoreError` if the input is invalid or the query fails.
424    pub async fn remove_database(&self, db_name: &str) -> StoreResult<()> {
425        ensure_non_empty(db_name, "db_name")?;
426        let identifier = Table::from(db_name).to_sql();
427        let statement = format!("REMOVE DATABASE {identifier};");
428        self.db.query(statement).await?.check()?;
429        Ok(())
430    }
431
432    /// Fetches a symbol by key.
433    ///
434    /// # Errors
435    /// Returns `StoreError` if the database query fails.
436    pub async fn get_symbol(&self, symbol_key: &str) -> StoreResult<Option<Symbol>> {
437        self.ensure_schema().await?;
438        let record = RecordId::new(TABLE_SYMBOL, symbol_key);
439        let mut response = self
440            .db
441            .query("SELECT *, record::id(id) AS id FROM $record;")
442            .bind(("record", record))
443            .await?;
444        let mut records: Vec<Symbol> = response.take(0)?;
445        Ok(records.pop())
446    }
447
448    /// Fetches a symbol by project id and key.
449    ///
450    /// # Errors
451    /// Returns `StoreError` if the database query fails.
452    pub async fn get_symbol_by_project(
453        &self,
454        project_id: &str,
455        symbol_key: &str,
456    ) -> StoreResult<Option<Symbol>> {
457        self.ensure_schema().await?;
458        let project_id = project_id.to_string();
459        let symbol_key = symbol_key.to_string();
460        let query = "SELECT *, record::id(id) AS id FROM symbol WHERE project_id = $project_id AND symbol_key = $symbol_key LIMIT 1;";
461        let mut response = self
462            .db
463            .query(query)
464            .bind(("project_id", project_id))
465            .bind(("symbol_key", symbol_key))
466            .await?;
467        let mut records: Vec<Symbol> = response.take(0)?;
468        Ok(records.pop())
469    }
470
471    /// Lists symbols by name match within a project.
472    ///
473    /// # Errors
474    /// Returns `StoreError` if the limit is invalid or the database query fails.
475    pub async fn list_symbols_by_name(
476        &self,
477        project_id: &str,
478        name: &str,
479        limit: usize,
480    ) -> StoreResult<Vec<Symbol>> {
481        self.ensure_schema().await?;
482        let project_id = project_id.to_string();
483        let name = name.to_string();
484        let limit = limit_to_i64(limit)?;
485        let query = "SELECT *, record::id(id) AS id FROM symbol WHERE project_id = $project_id AND name CONTAINS $name LIMIT $limit;";
486        let mut response = self
487            .db
488            .query(query)
489            .bind(("project_id", project_id))
490            .bind(("name", name))
491            .bind(("limit", limit))
492            .await?;
493        let records: Vec<Symbol> = response.take(0)?;
494        Ok(records)
495    }
496
497    /// Searches symbols with multiple optional filters.
498    ///
499    /// # Errors
500    /// Returns `StoreError` if the limit is invalid or the database query fails.
501    pub async fn search_symbols_advanced(
502        &self,
503        project_id: &str,
504        name: Option<&str>,
505        qualified_name: Option<&str>,
506        symbol_key: Option<&str>,
507        signature: Option<&str>,
508        limit: usize,
509    ) -> StoreResult<Vec<Symbol>> {
510        self.ensure_schema().await?;
511        let project_id = project_id.to_string();
512        let limit = limit_to_i64(limit)?;
513
514        let mut clauses = vec!["project_id = $project_id".to_string()];
515        if symbol_key.is_some() {
516            clauses.push("symbol_key = $symbol_key".to_string());
517        }
518        if name.is_some() {
519            clauses.push(
520                "name != NONE AND string::contains(string::lowercase(name), string::lowercase($name))"
521                    .to_string(),
522            );
523        }
524        if qualified_name.is_some() {
525            clauses.push(
526                "qualified_name != NONE AND string::contains(string::lowercase(qualified_name), string::lowercase($qualified_name))"
527                    .to_string(),
528            );
529        }
530        if signature.is_some() {
531            clauses.push(
532                "signature != NONE AND string::contains(string::lowercase(signature), string::lowercase($signature))"
533                    .to_string(),
534            );
535        }
536
537        let query = format!(
538            "SELECT *, record::id(id) AS id FROM symbol WHERE {} LIMIT $limit;",
539            clauses.join(" AND ")
540        );
541
542        let mut request = self
543            .db
544            .query(query)
545            .bind(("project_id", project_id))
546            .bind(("limit", limit));
547        if let Some(value) = symbol_key {
548            request = request.bind(("symbol_key", value.to_string()));
549        }
550        if let Some(value) = name {
551            request = request.bind(("name", value.to_string()));
552        }
553        if let Some(value) = qualified_name {
554            request = request.bind(("qualified_name", value.to_string()));
555        }
556        if let Some(value) = signature {
557            request = request.bind(("signature", value.to_string()));
558        }
559
560        let mut response = request.await?;
561        let records: Vec<Symbol> = response.take(0)?;
562        Ok(records)
563    }
564
565    /// Lists distinct symbol kinds for a project.
566    ///
567    /// # Errors
568    /// Returns `StoreError` if the database query fails.
569    pub async fn list_symbol_kinds(&self, project_id: &str) -> StoreResult<Vec<String>> {
570        self.ensure_schema().await?;
571        let project_id = project_id.to_string();
572        let query = "SELECT kind FROM symbol WHERE project_id = $project_id GROUP BY kind;";
573        let mut response = self
574            .db
575            .query(query)
576            .bind(("project_id", project_id))
577            .await?;
578        let records: Vec<SymbolKindRow> = response.take(0)?;
579        let mut kinds: Vec<String> = records
580            .into_iter()
581            .filter_map(|row| row.kind)
582            .filter(|value| !value.trim().is_empty())
583            .collect();
584        kinds.sort();
585        kinds.dedup();
586        Ok(kinds)
587    }
588
589    /// Lists members by scope prefix or glob pattern.
590    ///
591    /// # Errors
592    /// Returns `StoreError` if the scope or limit is invalid or the database query fails.
593    pub async fn list_members_by_scope(
594        &self,
595        project_id: &str,
596        scope: &str,
597        limit: usize,
598    ) -> StoreResult<Vec<Symbol>> {
599        self.ensure_schema().await?;
600        let Some(scope) = normalize_pattern(scope) else {
601            return Ok(Vec::new());
602        };
603        let project_id = project_id.to_string();
604        let limit = limit_to_i64(limit)?;
605        let mut response = if scope.contains('*') {
606            let regex = build_scope_regex(&scope)?;
607            let query = format!(
608                "SELECT *, record::id(id) AS id FROM symbol WHERE project_id = $project_id AND qualified_name != NONE AND string::matches(string::lowercase(qualified_name), {}) LIMIT $limit;",
609                regex.to_sql()
610            );
611            self.db
612                .query(query)
613                .bind(("project_id", project_id))
614                .bind(("limit", limit))
615                .await?
616        } else {
617            let query = "SELECT *, record::id(id) AS id FROM symbol WHERE project_id = $project_id AND qualified_name != NONE AND string::starts_with(string::lowercase(qualified_name), $scope) LIMIT $limit;";
618            self.db
619                .query(query)
620                .bind(("project_id", project_id))
621                .bind(("scope", scope))
622                .bind(("limit", limit))
623                .await?
624        };
625        let records: Vec<Symbol> = response.take(0)?;
626        Ok(records)
627    }
628
629    /// Lists document blocks for a symbol, optionally filtering by ingest id.
630    ///
631    /// # Errors
632    /// Returns `StoreError` if the database query fails.
633    pub async fn list_doc_blocks(
634        &self,
635        project_id: &str,
636        symbol_key: &str,
637        ingest_id: Option<&str>,
638    ) -> StoreResult<Vec<DocBlock>> {
639        self.ensure_schema().await?;
640        let project_id = project_id.to_string();
641        let symbol_key = symbol_key.to_string();
642        let (query, binds) = ingest_id.map_or(
643            (
644                "SELECT *, record::id(id) AS id FROM doc_block WHERE project_id = $project_id AND symbol_key = $symbol_key;",
645                None,
646            ),
647            |ingest_id| (
648                "SELECT *, record::id(id) AS id FROM doc_block WHERE project_id = $project_id AND symbol_key = $symbol_key AND ingest_id = $ingest_id;",
649                Some(ingest_id.to_string()),
650            ),
651        );
652        let response = self
653            .db
654            .query(query)
655            .bind(("project_id", project_id))
656            .bind(("symbol_key", symbol_key));
657        let mut response = if let Some(ingest_id) = binds {
658            response.bind(("ingest_id", ingest_id)).await?
659        } else {
660            response.await?
661        };
662        let records: Vec<DocBlock> = response.take(0)?;
663        Ok(records)
664    }
665
666    /// Searches document blocks by text within a project.
667    ///
668    /// # Errors
669    /// Returns `StoreError` if the limit is invalid or the database query fails.
670    pub async fn search_doc_blocks(
671        &self,
672        project_id: &str,
673        text: &str,
674        limit: usize,
675    ) -> StoreResult<Vec<DocBlock>> {
676        self.ensure_schema().await?;
677        let project_id = project_id.to_string();
678        let text = text.to_string();
679        let limit = limit_to_i64(limit)?;
680        let query = "\
681            SELECT *, record::id(id) AS id FROM doc_block \
682            WHERE project_id = $project_id \
683              AND (string::contains(string::lowercase(summary ?? ''), string::lowercase($text)) \
684                OR string::contains(string::lowercase(remarks ?? ''), string::lowercase($text)) \
685                OR string::contains(string::lowercase(returns ?? ''), string::lowercase($text)) \
686                OR string::contains(string::lowercase(errors ?? ''), string::lowercase($text)) \
687                OR string::contains(string::lowercase(panics ?? ''), string::lowercase($text)) \
688                OR string::contains(string::lowercase(safety ?? ''), string::lowercase($text))) \
689            LIMIT $limit;";
690        let mut response = self
691            .db
692            .query(query)
693            .bind(("project_id", project_id))
694            .bind(("text", text))
695            .bind(("limit", limit))
696            .await?;
697        let records: Vec<DocBlock> = response.take(0)?;
698        Ok(records)
699    }
700
701    /// Lists document sources by project and ingest ids.
702    ///
703    /// # Errors
704    /// Returns `StoreError` if the database query fails.
705    pub async fn list_doc_sources(
706        &self,
707        project_id: &str,
708        ingest_ids: &[String],
709    ) -> StoreResult<Vec<DocSource>> {
710        self.ensure_schema().await?;
711        if ingest_ids.is_empty() {
712            return Ok(Vec::new());
713        }
714        let project_id = project_id.to_string();
715        let ingest_ids = normalize_ingest_filter_ids(project_id.as_str(), ingest_ids);
716        if ingest_ids.is_empty() {
717            return Ok(Vec::new());
718        }
719        let query =
720            "SELECT * FROM doc_source WHERE project_id = $project_id AND ingest_id IN $ingest_ids;";
721        let mut response = self
722            .db
723            .query(query)
724            .bind(("project_id", project_id))
725            .bind(("ingest_ids", ingest_ids))
726            .await?;
727        let records: Vec<DocSourceRow> = response.take(0)?;
728        Ok(records.into_iter().map(DocSource::from).collect())
729    }
730
731    /// Lists document sources by explicit source ids.
732    ///
733    /// # Errors
734    /// Returns `StoreError` if the database query fails.
735    pub async fn list_doc_sources_by_ids(
736        &self,
737        project_id: &str,
738        doc_source_ids: &[String],
739    ) -> StoreResult<Vec<DocSource>> {
740        self.ensure_schema().await?;
741        if doc_source_ids.is_empty() {
742            return Ok(Vec::new());
743        }
744        let project_id = project_id.to_string();
745        let mut unique_ids = HashSet::new();
746        let records: Vec<RecordId> = doc_source_ids
747            .iter()
748            .filter(|value| !value.is_empty())
749            .filter(|value| unique_ids.insert((*value).clone()))
750            .map(|value| RecordId::new(TABLE_DOC_SOURCE, value.as_str()))
751            .collect();
752        if records.is_empty() {
753            return Ok(Vec::new());
754        }
755        let query = "SELECT * FROM doc_source WHERE project_id = $project_id AND id IN $records;";
756        let mut response = self
757            .db
758            .query(query)
759            .bind(("project_id", project_id))
760            .bind(("records", records))
761            .await?;
762        let records: Vec<DocSourceRow> = response.take(0)?;
763        Ok(records.into_iter().map(DocSource::from).collect())
764    }
765
766    /// Counts table rows for a project.
767    ///
768    /// # Errors
769    /// Returns `StoreError` if the input is invalid or the database query fails.
770    pub async fn count_rows_for_project(
771        &self,
772        table: &str,
773        project_id: &str,
774    ) -> StoreResult<usize> {
775        self.ensure_schema().await?;
776        ensure_identifier(table, "table")?;
777        let query = format!(
778            "SELECT count() AS count FROM {table} WHERE project_id = $project_id GROUP ALL;"
779        );
780        let mut response = self
781            .db
782            .query(query)
783            .bind(("project_id", project_id.to_string()))
784            .await?;
785        let rows: Vec<CountRow> = response.take(0)?;
786        Ok(rows
787            .first()
788            .and_then(|row| usize::try_from(row.count).ok())
789            .unwrap_or(0))
790    }
791
792    /// Counts symbols in a project where a given field is missing (`NONE`).
793    ///
794    /// # Errors
795    /// Returns `StoreError` if the input is invalid or the database query fails.
796    pub async fn count_symbols_missing_field(
797        &self,
798        project_id: &str,
799        field: &str,
800    ) -> StoreResult<usize> {
801        self.ensure_schema().await?;
802        ensure_identifier(field, "field")?;
803        let query = format!(
804            "SELECT count() AS count FROM symbol WHERE project_id = $project_id AND {field} = NONE GROUP ALL;"
805        );
806        let mut response = self
807            .db
808            .query(query)
809            .bind(("project_id", project_id.to_string()))
810            .await?;
811        let rows: Vec<CountRow> = response.take(0)?;
812        Ok(rows
813            .first()
814            .and_then(|row| usize::try_from(row.count).ok())
815            .unwrap_or(0))
816    }
817
818    /// Lists non-null symbol keys attached to doc blocks for a project.
819    ///
820    /// # Errors
821    /// Returns `StoreError` if the database query fails.
822    pub async fn list_doc_block_symbol_keys(&self, project_id: &str) -> StoreResult<Vec<String>> {
823        self.ensure_schema().await?;
824        let mut response = self
825            .db
826            .query(
827                "SELECT symbol_key FROM doc_block WHERE project_id = $project_id AND symbol_key != NONE;",
828            )
829            .bind(("project_id", project_id.to_string()))
830            .await?;
831        let rows: Vec<DocBlockSymbolKeyRow> = response.take(0)?;
832        Ok(rows.into_iter().map(|row| row.symbol_key).collect())
833    }
834
835    /// Lists observed-in source relation endpoint ids (`symbol:<id>`) for a project.
836    ///
837    /// # Errors
838    /// Returns `StoreError` if the database query fails.
839    pub async fn list_observed_in_symbol_refs(&self, project_id: &str) -> StoreResult<Vec<String>> {
840        self.ensure_schema().await?;
841        let mut response = self
842            .db
843            .query("SELECT in AS symbol_id FROM observed_in WHERE project_id = $project_id;")
844            .bind(("project_id", project_id.to_string()))
845            .await?;
846        let rows: Vec<ObservedInSymbolRow> = response.take(0)?;
847        Ok(rows
848            .into_iter()
849            .map(|row| record_id_to_record_ref(row.symbol_id))
850            .collect())
851    }
852
853    /// Fetches a document source by id.
854    ///
855    /// # Errors
856    /// Returns `StoreError` if the database query fails.
857    pub async fn get_doc_source(&self, doc_source_id: &str) -> StoreResult<Option<DocSource>> {
858        self.ensure_schema().await?;
859        let record = RecordId::new(TABLE_DOC_SOURCE, doc_source_id);
860        let mut response = self
861            .db
862            .query("SELECT * FROM $record;")
863            .bind(("record", record))
864            .await?;
865        let records: Vec<DocSourceRow> = response.take(0)?;
866        Ok(records.into_iter().next().map(DocSource::from))
867    }
868
869    /// Lists document sources for a project, optionally filtered by ingest id.
870    ///
871    /// # Errors
872    /// Returns `StoreError` if the limit is invalid or the database query fails.
873    pub async fn list_doc_sources_by_project(
874        &self,
875        project_id: &str,
876        ingest_id: Option<&str>,
877        limit: usize,
878    ) -> StoreResult<Vec<DocSource>> {
879        self.ensure_schema().await?;
880        let project_id = project_id.to_string();
881        let limit = limit_to_i64(limit)?;
882        let (query, ingest_ids) = ingest_id.map_or(
883            (
884                "SELECT * FROM doc_source WHERE project_id = $project_id ORDER BY source_modified_at DESC LIMIT $limit;",
885                None,
886            ),
887            |ingest_id| {
888                (
889                    "SELECT * FROM doc_source WHERE project_id = $project_id AND ingest_id IN $ingest_ids ORDER BY source_modified_at DESC LIMIT $limit;",
890                    Some(normalize_ingest_filter_ids(
891                        project_id.as_str(),
892                        &[ingest_id.to_string()],
893                    )),
894                )
895            },
896        );
897        if ingest_ids.as_ref().is_some_and(Vec::is_empty) {
898            return Ok(Vec::new());
899        }
900        let response = self
901            .db
902            .query(query)
903            .bind(("project_id", project_id))
904            .bind(("limit", limit));
905        let mut response = if let Some(ingest_ids) = ingest_ids {
906            response.bind(("ingest_ids", ingest_ids)).await?
907        } else {
908            response.await?
909        };
910        let records: Vec<DocSourceRow> = response.take(0)?;
911        Ok(records.into_iter().map(DocSource::from).collect())
912    }
913
914    /// Lists relation records in a table where the symbol is the source (outgoing).
915    ///
916    /// # Errors
917    /// Returns `StoreError` if the database query fails.
918    pub async fn list_relations_from_symbol(
919        &self,
920        table: &str,
921        project_id: &str,
922        symbol_id: &str,
923        limit: usize,
924    ) -> StoreResult<Vec<RelationRecord>> {
925        self.ensure_schema().await?;
926        ensure_identifier(table, "table")?;
927        let limit = limit_to_i64(limit)?;
928        let record_id = RecordId::new(TABLE_SYMBOL, symbol_id);
929        let query = format!(
930            "SELECT id, in AS in_id, out AS out_id, project_id, ingest_id, kind, extra FROM $record->{table} WHERE project_id = $project_id LIMIT $limit;"
931        );
932        let mut response = self
933            .db
934            .query(query)
935            .bind(("project_id", project_id.to_string()))
936            .bind(("record", record_id))
937            .bind(("limit", limit))
938            .await?;
939        let records: Vec<RelationRow> = response.take(0)?;
940        Ok(records.into_iter().map(RelationRecord::from).collect())
941    }
942
943    /// Lists relation records in a table where the symbol is the target (incoming).
944    ///
945    /// # Errors
946    /// Returns `StoreError` if the database query fails.
947    pub async fn list_relations_to_symbol(
948        &self,
949        table: &str,
950        project_id: &str,
951        symbol_id: &str,
952        limit: usize,
953    ) -> StoreResult<Vec<RelationRecord>> {
954        self.ensure_schema().await?;
955        ensure_identifier(table, "table")?;
956        let limit = limit_to_i64(limit)?;
957        let record_id = RecordId::new(TABLE_SYMBOL, symbol_id);
958        let query = format!(
959            "SELECT id, in AS in_id, out AS out_id, project_id, ingest_id, kind, extra FROM $record<-{table} WHERE project_id = $project_id LIMIT $limit;"
960        );
961        let mut response = self
962            .db
963            .query(query)
964            .bind(("project_id", project_id.to_string()))
965            .bind(("record", record_id))
966            .bind(("limit", limit))
967            .await?;
968        let records: Vec<RelationRow> = response.take(0)?;
969        Ok(records.into_iter().map(RelationRecord::from).collect())
970    }
971
972    /// Fetches all adjacency relations for a symbol in a single multi-statement query.
973    ///
974    /// # Errors
975    /// Returns `StoreError` if the database query fails.
976    #[allow(clippy::too_many_lines)]
977    pub async fn fetch_symbol_adjacency(
978        &self,
979        symbol_id: &str,
980        project_id: &str,
981        limit: usize,
982    ) -> StoreResult<AdjacencyRaw> {
983        self.ensure_schema().await?;
984        let limit = limit_to_i64(limit)?;
985        let record = RecordId::new(TABLE_SYMBOL, symbol_id);
986        let query = r"
987            LET $sym = $record;
988            SELECT id, in AS in_id, out AS out_id, project_id, ingest_id, kind, extra FROM $sym->member_of   WHERE project_id = $project_id LIMIT $limit;
989            SELECT id, in AS in_id, out AS out_id, project_id, ingest_id, kind, extra FROM $sym<-member_of   WHERE project_id = $project_id LIMIT $limit;
990            SELECT id, in AS in_id, out AS out_id, project_id, ingest_id, kind, extra FROM $sym->contains    WHERE project_id = $project_id LIMIT $limit;
991            SELECT id, in AS in_id, out AS out_id, project_id, ingest_id, kind, extra FROM $sym<-contains    WHERE project_id = $project_id LIMIT $limit;
992            SELECT id, in AS in_id, out AS out_id, project_id, ingest_id, kind, extra FROM $sym->returns     WHERE project_id = $project_id LIMIT $limit;
993            SELECT id, in AS in_id, out AS out_id, project_id, ingest_id, kind, extra FROM $sym<-returns     WHERE project_id = $project_id LIMIT $limit;
994            SELECT id, in AS in_id, out AS out_id, project_id, ingest_id, kind, extra FROM $sym->param_type  WHERE project_id = $project_id LIMIT $limit;
995            SELECT id, in AS in_id, out AS out_id, project_id, ingest_id, kind, extra FROM $sym<-param_type  WHERE project_id = $project_id LIMIT $limit;
996            SELECT id, in AS in_id, out AS out_id, project_id, ingest_id, kind, extra FROM $sym->see_also    WHERE project_id = $project_id LIMIT $limit;
997            SELECT id, in AS in_id, out AS out_id, project_id, ingest_id, kind, extra FROM $sym<-see_also    WHERE project_id = $project_id LIMIT $limit;
998            SELECT id, in AS in_id, out AS out_id, project_id, ingest_id, kind, extra FROM $sym->inherits    WHERE project_id = $project_id LIMIT $limit;
999            SELECT id, in AS in_id, out AS out_id, project_id, ingest_id, kind, extra FROM $sym<-inherits    WHERE project_id = $project_id LIMIT $limit;
1000            SELECT id, in AS in_id, out AS out_id, project_id, ingest_id, kind, extra FROM $sym->references  WHERE project_id = $project_id LIMIT $limit;
1001            SELECT id, in AS in_id, out AS out_id, project_id, ingest_id, kind, extra FROM $sym<-references  WHERE project_id = $project_id LIMIT $limit;
1002            SELECT id, in AS in_id, out AS out_id, project_id, ingest_id, kind, extra FROM $sym->observed_in WHERE project_id = $project_id LIMIT $limit;
1003        ";
1004        let mut response = self
1005            .db
1006            .query(query)
1007            .bind(("record", record))
1008            .bind(("project_id", project_id.to_string()))
1009            .bind(("limit", limit))
1010            .await?;
1011
1012        // Statement 0 is LET, statements 1..=15 are SELECTs
1013        let member_of_out: Vec<RelationRow> = response.take(1)?;
1014        let member_of_in: Vec<RelationRow> = response.take(2)?;
1015        let contains_out: Vec<RelationRow> = response.take(3)?;
1016        let contains_in: Vec<RelationRow> = response.take(4)?;
1017        let returns_out: Vec<RelationRow> = response.take(5)?;
1018        let returns_in: Vec<RelationRow> = response.take(6)?;
1019        let param_types_out: Vec<RelationRow> = response.take(7)?;
1020        let param_types_in: Vec<RelationRow> = response.take(8)?;
1021        let see_also_out: Vec<RelationRow> = response.take(9)?;
1022        let see_also_in: Vec<RelationRow> = response.take(10)?;
1023        let inherits_out: Vec<RelationRow> = response.take(11)?;
1024        let inherits_in: Vec<RelationRow> = response.take(12)?;
1025        let references_out: Vec<RelationRow> = response.take(13)?;
1026        let references_in: Vec<RelationRow> = response.take(14)?;
1027        let observed_in_out: Vec<RelationRow> = response.take(15)?;
1028
1029        let to_records = |rows: Vec<RelationRow>| -> Vec<RelationRecord> {
1030            rows.into_iter().map(RelationRecord::from).collect()
1031        };
1032
1033        Ok(AdjacencyRaw {
1034            member_of: merge_relation_rows(to_records(member_of_out), to_records(member_of_in)),
1035            contains: merge_relation_rows(to_records(contains_out), to_records(contains_in)),
1036            returns: merge_relation_rows(to_records(returns_out), to_records(returns_in)),
1037            param_types: merge_relation_rows(
1038                to_records(param_types_out),
1039                to_records(param_types_in),
1040            ),
1041            see_also: merge_relation_rows(to_records(see_also_out), to_records(see_also_in)),
1042            inherits: merge_relation_rows(to_records(inherits_out), to_records(inherits_in)),
1043            references: merge_relation_rows(to_records(references_out), to_records(references_in)),
1044            observed_in: to_records(observed_in_out),
1045        })
1046    }
1047
1048    /// Lists relation records for a document block id.
1049    ///
1050    /// # Errors
1051    /// Returns `StoreError` if the database query fails.
1052    pub async fn list_relations_from_doc_block(
1053        &self,
1054        table: &str,
1055        project_id: &str,
1056        doc_block_id: &str,
1057        limit: usize,
1058    ) -> StoreResult<Vec<RelationRecord>> {
1059        self.ensure_schema().await?;
1060        ensure_identifier(table, "table")?;
1061        let project_id = project_id.to_string();
1062        let limit = limit_to_i64(limit)?;
1063        let record_id = RecordId::new(TABLE_DOC_BLOCK, doc_block_id);
1064        let query = format!(
1065            "SELECT id, in AS in_id, out AS out_id, project_id, ingest_id, kind, extra FROM {table} WHERE project_id = $project_id AND in = $record_id LIMIT $limit;"
1066        );
1067        let mut response = self
1068            .db
1069            .query(query)
1070            .bind(("project_id", project_id))
1071            .bind(("record_id", record_id))
1072            .bind(("limit", limit))
1073            .await?;
1074        let records: Vec<RelationRow> = response.take(0)?;
1075        Ok(records.into_iter().map(RelationRecord::from).collect())
1076    }
1077}
1078
1079/// Raw adjacency data returned from a single multi-statement query.
1080#[derive(Debug, Default)]
1081pub struct AdjacencyRaw {
1082    pub member_of: Vec<RelationRecord>,
1083    pub contains: Vec<RelationRecord>,
1084    pub returns: Vec<RelationRecord>,
1085    pub param_types: Vec<RelationRecord>,
1086    pub see_also: Vec<RelationRecord>,
1087    pub inherits: Vec<RelationRecord>,
1088    pub references: Vec<RelationRecord>,
1089    pub observed_in: Vec<RelationRecord>,
1090}
1091
1092fn merge_relation_rows(
1093    mut left: Vec<RelationRecord>,
1094    right: Vec<RelationRecord>,
1095) -> Vec<RelationRecord> {
1096    let mut seen = std::collections::HashSet::new();
1097    for r in &left {
1098        seen.insert((r.in_id.clone(), r.out_id.clone(), r.kind.clone()));
1099    }
1100    for r in right {
1101        let key = (r.in_id.clone(), r.out_id.clone(), r.kind.clone());
1102        if seen.insert(key) {
1103            left.push(r);
1104        }
1105    }
1106    left
1107}
1108
1109fn ensure_non_empty(value: &str, field: &str) -> StoreResult<()> {
1110    if value.is_empty() {
1111        return Err(StoreError::InvalidInput(format!("{field} is required")));
1112    }
1113    Ok(())
1114}
1115
1116fn ensure_identifier(value: &str, field: &str) -> StoreResult<()> {
1117    if value.is_empty()
1118        || !value
1119            .chars()
1120            .all(|ch| ch.is_ascii_alphanumeric() || ch == '_')
1121    {
1122        return Err(StoreError::InvalidInput(format!(
1123            "{field} must be a valid identifier"
1124        )));
1125    }
1126    Ok(())
1127}
1128
1129fn parse_record_id(value: &str, field: &str) -> StoreResult<RecordId> {
1130    ensure_non_empty(value, field)?;
1131    RecordId::parse_simple(value).map_err(|err| {
1132        StoreError::InvalidInput(format!(
1133            "{field} must be a record id in 'table:key' format: {err}"
1134        ))
1135    })
1136}
1137
1138#[derive(Debug, Clone, Serialize, SurrealValue)]
1139struct RelationPayload {
1140    project_id: String,
1141    #[serde(skip_serializing_if = "Option::is_none")]
1142    ingest_id: Option<String>,
1143    #[serde(skip_serializing_if = "Option::is_none")]
1144    kind: Option<String>,
1145    #[serde(skip_serializing_if = "Option::is_none")]
1146    extra: Option<Value>,
1147}
1148
1149impl From<&RelationRecord> for RelationPayload {
1150    fn from(value: &RelationRecord) -> Self {
1151        Self {
1152            project_id: value.project_id.clone(),
1153            ingest_id: value.ingest_id.clone(),
1154            kind: value.kind.clone(),
1155            extra: value.extra.clone(),
1156        }
1157    }
1158}
1159
1160#[derive(serde::Deserialize, SurrealValue)]
1161struct RelationRow {
1162    id: RecordId,
1163    in_id: RecordId,
1164    out_id: RecordId,
1165    project_id: String,
1166    ingest_id: Option<String>,
1167    kind: Option<String>,
1168    extra: Option<Value>,
1169}
1170
1171impl From<RelationRow> for RelationRecord {
1172    fn from(row: RelationRow) -> Self {
1173        Self {
1174            id: Some(record_id_to_string(row.id)),
1175            in_id: record_id_to_record_ref(row.in_id),
1176            out_id: record_id_to_record_ref(row.out_id),
1177            project_id: row.project_id,
1178            ingest_id: row.ingest_id,
1179            kind: row.kind,
1180            extra: row.extra,
1181        }
1182    }
1183}
1184
1185#[derive(serde::Deserialize, SurrealValue)]
1186struct IngestRow {
1187    id: RecordId,
1188    project_id: String,
1189    git_commit: Option<String>,
1190    git_branch: Option<String>,
1191    git_tag: Option<String>,
1192    project_version: Option<String>,
1193    source_modified_at: Option<String>,
1194    ingested_at: Option<String>,
1195    extra: Option<serde_json::Value>,
1196}
1197
1198impl From<IngestRow> for Ingest {
1199    fn from(row: IngestRow) -> Self {
1200        Self {
1201            id: Some(record_id_to_string(row.id)),
1202            project_id: row.project_id,
1203            git_commit: row.git_commit,
1204            git_branch: row.git_branch,
1205            git_tag: row.git_tag,
1206            project_version: row.project_version,
1207            source_modified_at: row.source_modified_at,
1208            ingested_at: row.ingested_at,
1209            extra: row.extra,
1210        }
1211    }
1212}
1213
1214#[derive(serde::Deserialize, SurrealValue)]
1215struct DocSourceRow {
1216    id: RecordId,
1217    project_id: String,
1218    ingest_id: Option<String>,
1219    language: Option<String>,
1220    source_kind: Option<String>,
1221    path: Option<String>,
1222    tool_version: Option<String>,
1223    hash: Option<String>,
1224    source_modified_at: Option<String>,
1225    extra: Option<serde_json::Value>,
1226}
1227
1228impl From<DocSourceRow> for DocSource {
1229    fn from(row: DocSourceRow) -> Self {
1230        Self {
1231            id: Some(record_id_to_string(row.id)),
1232            project_id: row.project_id,
1233            ingest_id: row.ingest_id,
1234            language: row.language,
1235            source_kind: row.source_kind,
1236            path: row.path,
1237            tool_version: row.tool_version,
1238            hash: row.hash,
1239            source_modified_at: row.source_modified_at,
1240            extra: row.extra,
1241        }
1242    }
1243}
1244
1245#[derive(serde::Deserialize, SurrealValue)]
1246struct SymbolKindRow {
1247    kind: Option<String>,
1248}
1249
1250#[derive(serde::Deserialize, SurrealValue)]
1251struct CountRow {
1252    count: i64,
1253}
1254
1255#[derive(serde::Deserialize, SurrealValue)]
1256struct DocBlockSymbolKeyRow {
1257    symbol_key: String,
1258}
1259
1260#[derive(serde::Deserialize, SurrealValue)]
1261struct ObservedInSymbolRow {
1262    symbol_id: RecordId,
1263}
1264
1265async fn apply_schema<C: Connection>(db: &Surreal<C>, schema: &str) -> StoreResult<()> {
1266    db.query(schema).await?.check()?;
1267    Ok(())
1268}
1269
1270fn split_optional_doc_block_fts_schema(schema: &str) -> StoreResult<(String, Option<String>)> {
1271    let mut required_schema = String::new();
1272    let mut optional_schema = String::new();
1273    let mut in_optional_block = false;
1274    let mut found_optional_start = false;
1275    let mut found_optional_end = false;
1276
1277    for line in schema.lines() {
1278        let trimmed = line.trim();
1279        if trimmed == OPTIONAL_DOC_BLOCK_FTS_START {
1280            if in_optional_block || found_optional_start {
1281                return Err(StoreError::InvalidInput(
1282                    "schema optional FTS block start marker appears multiple times".to_string(),
1283                ));
1284            }
1285            found_optional_start = true;
1286            in_optional_block = true;
1287            continue;
1288        }
1289        if trimmed == OPTIONAL_DOC_BLOCK_FTS_END {
1290            if !in_optional_block {
1291                return Err(StoreError::InvalidInput(
1292                    "schema optional FTS block end marker appears before start".to_string(),
1293                ));
1294            }
1295            found_optional_end = true;
1296            in_optional_block = false;
1297            continue;
1298        }
1299        if in_optional_block {
1300            optional_schema.push_str(line);
1301            optional_schema.push('\n');
1302        } else {
1303            required_schema.push_str(line);
1304            required_schema.push('\n');
1305        }
1306    }
1307
1308    if in_optional_block {
1309        return Err(StoreError::InvalidInput(
1310            "schema optional FTS block start marker is missing a matching end marker".to_string(),
1311        ));
1312    }
1313    if found_optional_start != found_optional_end {
1314        return Err(StoreError::InvalidInput(
1315            "schema optional FTS block markers are unbalanced".to_string(),
1316        ));
1317    }
1318    if !found_optional_start {
1319        return Ok((required_schema, None));
1320    }
1321
1322    let optional_schema = optional_schema.trim();
1323    if optional_schema.is_empty() {
1324        return Err(StoreError::InvalidInput(
1325            "schema optional FTS block is empty".to_string(),
1326        ));
1327    }
1328    Ok((required_schema, Some(optional_schema.to_string())))
1329}
1330
1331fn normalize_pattern(pattern: &str) -> Option<String> {
1332    let trimmed = pattern.trim().to_lowercase();
1333    if trimmed.is_empty() {
1334        None
1335    } else {
1336        Some(trimmed)
1337    }
1338}
1339
1340fn limit_to_i64(limit: usize) -> StoreResult<i64> {
1341    i64::try_from(limit)
1342        .map_err(|_| StoreError::InvalidInput("limit exceeds supported range".to_string()))
1343}
1344
1345fn record_id_to_string(record_id: RecordId) -> String {
1346    record_id_key_to_string(record_id.key)
1347}
1348
1349fn record_id_to_record_ref(record_id: RecordId) -> String {
1350    let table = record_id.table.into_string();
1351    let key = record_id_key_to_string(record_id.key);
1352    format!("{table}:{key}")
1353}
1354
1355fn record_id_key_to_string(key: RecordIdKey) -> String {
1356    match key {
1357        RecordIdKey::String(value) => value,
1358        other => other.to_sql(),
1359    }
1360}
1361
1362fn build_project_regex(pattern: &str) -> StoreResult<Regex> {
1363    let body = glob_to_regex_body(pattern);
1364    let regex = format!(r"(^|\|){body}(\||$)");
1365    Regex::from_str(&regex)
1366        .map_err(|err| StoreError::InvalidInput(format!("Invalid project search pattern: {err}")))
1367}
1368
1369fn build_scope_regex(pattern: &str) -> StoreResult<Regex> {
1370    let body = glob_to_regex_body(pattern);
1371    let regex = format!(r"^{body}$");
1372    Regex::from_str(&regex)
1373        .map_err(|err| StoreError::InvalidInput(format!("Invalid scope search pattern: {err}")))
1374}
1375
1376fn glob_to_regex_body(pattern: &str) -> String {
1377    let mut escaped = String::new();
1378    for ch in pattern.chars() {
1379        match ch {
1380            '*' => escaped.push_str(".*"),
1381            '.' | '+' | '?' | '(' | ')' | '[' | ']' | '{' | '}' | '|' | '^' | '$' | '\\' => {
1382                escaped.push('\\');
1383                escaped.push(ch);
1384            }
1385            _ => escaped.push(ch),
1386        }
1387    }
1388    escaped
1389}
1390
1391fn make_scoped_ingest_id(project_id: &str, ingest_id: &str) -> String {
1392    let prefix = format!("{project_id}::");
1393    if ingest_id.starts_with(prefix.as_str()) {
1394        ingest_id.to_string()
1395    } else {
1396        format!("{project_id}::{ingest_id}")
1397    }
1398}
1399
1400fn normalize_ingest_filter_ids(project_id: &str, ingest_ids: &[String]) -> Vec<String> {
1401    let scoped_prefix = format!("{project_id}::");
1402    let mut unique = HashSet::new();
1403    let mut normalized = Vec::new();
1404    for ingest_id in ingest_ids {
1405        let trimmed = ingest_id.trim();
1406        if trimmed.is_empty() {
1407            continue;
1408        }
1409        if unique.insert(trimmed.to_string()) {
1410            normalized.push(trimmed.to_string());
1411        }
1412        if let Some(stripped) = trimmed.strip_prefix(scoped_prefix.as_str())
1413            && !stripped.is_empty()
1414            && unique.insert(stripped.to_string())
1415        {
1416            normalized.push(stripped.to_string());
1417        }
1418    }
1419    normalized
1420}
1421
1422fn merge_ingest_extra(existing: Option<Value>, requested_ingest_id: &str) -> Value {
1423    let mut object = match existing {
1424        Some(Value::Object(map)) => map,
1425        Some(value) => {
1426            let mut map = serde_json::Map::new();
1427            map.insert("value".to_string(), value);
1428            map
1429        }
1430        None => serde_json::Map::new(),
1431    };
1432    object.insert(
1433        "requested_ingest_id".to_string(),
1434        Value::String(requested_ingest_id.to_string()),
1435    );
1436    Value::Object(object)
1437}
1438
1439#[cfg(test)]
1440mod tests {
1441    use super::*;
1442    use docx_store::models::{DocSource, Ingest, Project, RelationRecord, Symbol};
1443    use docx_store::schema::REL_MEMBER_OF;
1444    use serde::Deserialize;
1445    use surrealdb::Surreal;
1446    use surrealdb::engine::local::{Db, Mem};
1447
1448    async fn build_store() -> SurrealDocStore<Db> {
1449        let db = Surreal::new::<Mem>(())
1450            .await
1451            .expect("failed to create in-memory SurrealDB");
1452        db.use_ns("docx")
1453            .use_db("test")
1454            .await
1455            .expect("failed to set namespace/db");
1456        SurrealDocStore::new(db)
1457    }
1458
1459    #[test]
1460    fn split_optional_doc_block_fts_schema_extracts_optional_block() {
1461        let schema = "\
1462DEFINE TABLE test SCHEMAFULL;\n\
1463-- OPTIONAL_DOC_BLOCK_FTS_START\n\
1464DEFINE ANALYZER IF NOT EXISTS docx_search TOKENIZERS blank,class FILTERS lowercase,snowball(english);\n\
1465-- OPTIONAL_DOC_BLOCK_FTS_END\n\
1466DEFINE INDEX test_idx ON TABLE test COLUMNS id;\n";
1467        let (required, optional) =
1468            split_optional_doc_block_fts_schema(schema).expect("split should succeed");
1469        let optional = optional.expect("optional block should be extracted");
1470        assert!(required.contains("DEFINE TABLE test SCHEMAFULL;"));
1471        assert!(required.contains("DEFINE INDEX test_idx ON TABLE test COLUMNS id;"));
1472        assert!(!required.contains("DEFINE ANALYZER"));
1473        assert!(optional.contains("DEFINE ANALYZER IF NOT EXISTS docx_search"));
1474    }
1475
1476    #[test]
1477    fn split_optional_doc_block_fts_schema_rejects_unclosed_optional_block() {
1478        let schema = "\
1479DEFINE TABLE test SCHEMAFULL;\n\
1480-- OPTIONAL_DOC_BLOCK_FTS_START\n\
1481DEFINE ANALYZER IF NOT EXISTS docx_search TOKENIZERS blank,class FILTERS lowercase,snowball(english);\n";
1482        let error = split_optional_doc_block_fts_schema(schema)
1483            .expect_err("unclosed optional block should fail");
1484        assert!(
1485            error
1486                .to_string()
1487                .contains("start marker is missing a matching end marker"),
1488            "error should describe unmatched optional block markers"
1489        );
1490    }
1491
1492    #[tokio::test]
1493    async fn list_ingests_includes_ids() {
1494        let store = build_store().await;
1495        let ingest = Ingest {
1496            id: Some("ingest-1".to_string()),
1497            project_id: "project".to_string(),
1498            git_commit: None,
1499            git_branch: None,
1500            git_tag: None,
1501            project_version: None,
1502            source_modified_at: None,
1503            ingested_at: None,
1504            extra: None,
1505        };
1506
1507        store
1508            .create_ingest(ingest)
1509            .await
1510            .expect("failed to create ingest");
1511        let ingests = store
1512            .list_ingests("project", 10)
1513            .await
1514            .expect("failed to list ingests");
1515
1516        assert_eq!(ingests.len(), 1);
1517        assert_eq!(ingests[0].id.as_deref(), Some("project::ingest-1"));
1518        assert_eq!(
1519            ingests[0]
1520                .extra
1521                .as_ref()
1522                .and_then(|value| value.get("requested_ingest_id"))
1523                .and_then(serde_json::Value::as_str),
1524            Some("ingest-1")
1525        );
1526    }
1527
1528    #[tokio::test]
1529    async fn list_ingests_scopes_same_ingest_id_per_project() {
1530        let store = build_store().await;
1531        let left = Ingest {
1532            id: Some("shared".to_string()),
1533            project_id: "project-left".to_string(),
1534            git_commit: None,
1535            git_branch: None,
1536            git_tag: None,
1537            project_version: None,
1538            source_modified_at: None,
1539            ingested_at: None,
1540            extra: None,
1541        };
1542        let right = Ingest {
1543            id: Some("shared".to_string()),
1544            project_id: "project-right".to_string(),
1545            git_commit: None,
1546            git_branch: None,
1547            git_tag: None,
1548            project_version: None,
1549            source_modified_at: None,
1550            ingested_at: None,
1551            extra: None,
1552        };
1553
1554        store
1555            .create_ingest(left)
1556            .await
1557            .expect("failed to create left ingest");
1558        store
1559            .create_ingest(right)
1560            .await
1561            .expect("failed to create right ingest");
1562
1563        let left_ingests = store
1564            .list_ingests("project-left", 10)
1565            .await
1566            .expect("failed to list left ingests");
1567        let right_ingests = store
1568            .list_ingests("project-right", 10)
1569            .await
1570            .expect("failed to list right ingests");
1571
1572        assert_eq!(left_ingests.len(), 1);
1573        assert_eq!(right_ingests.len(), 1);
1574        assert_eq!(left_ingests[0].id.as_deref(), Some("project-left::shared"));
1575        assert_eq!(
1576            right_ingests[0].id.as_deref(),
1577            Some("project-right::shared")
1578        );
1579    }
1580
1581    #[tokio::test]
1582    async fn get_ingest_supports_requested_id_when_unique() {
1583        let store = build_store().await;
1584        let ingest = Ingest {
1585            id: Some("requested".to_string()),
1586            project_id: "project".to_string(),
1587            git_commit: None,
1588            git_branch: None,
1589            git_tag: None,
1590            project_version: None,
1591            source_modified_at: None,
1592            ingested_at: None,
1593            extra: None,
1594        };
1595
1596        store
1597            .create_ingest(ingest)
1598            .await
1599            .expect("failed to create ingest");
1600        let found = store
1601            .get_ingest("requested")
1602            .await
1603            .expect("failed to lookup ingest by requested id");
1604        assert_eq!(
1605            found.and_then(|row| row.id),
1606            Some("project::requested".to_string())
1607        );
1608    }
1609
1610    #[tokio::test]
1611    async fn get_ingest_rejects_ambiguous_requested_id() {
1612        let store = build_store().await;
1613        let left = Ingest {
1614            id: Some("requested".to_string()),
1615            project_id: "project-left".to_string(),
1616            git_commit: None,
1617            git_branch: None,
1618            git_tag: None,
1619            project_version: None,
1620            source_modified_at: None,
1621            ingested_at: None,
1622            extra: None,
1623        };
1624        let right = Ingest {
1625            id: Some("requested".to_string()),
1626            project_id: "project-right".to_string(),
1627            git_commit: None,
1628            git_branch: None,
1629            git_tag: None,
1630            project_version: None,
1631            source_modified_at: None,
1632            ingested_at: None,
1633            extra: None,
1634        };
1635
1636        store
1637            .create_ingest(left)
1638            .await
1639            .expect("failed to create left ingest");
1640        store
1641            .create_ingest(right)
1642            .await
1643            .expect("failed to create right ingest");
1644
1645        let error = store
1646            .get_ingest("requested")
1647            .await
1648            .expect_err("ambiguous requested id should error");
1649        assert!(
1650            error
1651                .to_string()
1652                .contains("ambiguous; use project-scoped ingest id"),
1653            "error should explain scoped ingest id requirement"
1654        );
1655    }
1656
1657    #[tokio::test]
1658    async fn list_doc_sources_includes_ids() {
1659        let store = build_store().await;
1660        let source = DocSource {
1661            id: Some("source-1".to_string()),
1662            project_id: "project".to_string(),
1663            ingest_id: Some("ingest-1".to_string()),
1664            language: None,
1665            source_kind: None,
1666            path: None,
1667            tool_version: None,
1668            hash: None,
1669            source_modified_at: None,
1670            extra: None,
1671        };
1672
1673        store
1674            .create_doc_source(source)
1675            .await
1676            .expect("failed to create doc source");
1677        let sources = store
1678            .list_doc_sources_by_project("project", None, 10)
1679            .await
1680            .expect("failed to list doc sources");
1681
1682        assert_eq!(sources.len(), 1);
1683        assert_eq!(sources[0].id.as_deref(), Some("source-1"));
1684    }
1685
1686    #[tokio::test]
1687    async fn list_doc_sources_accepts_scoped_ingest_filter() {
1688        let store = build_store().await;
1689        let source = DocSource {
1690            id: Some("source-1".to_string()),
1691            project_id: "project".to_string(),
1692            ingest_id: Some("shared".to_string()),
1693            language: None,
1694            source_kind: None,
1695            path: None,
1696            tool_version: None,
1697            hash: None,
1698            source_modified_at: None,
1699            extra: None,
1700        };
1701
1702        store
1703            .create_doc_source(source)
1704            .await
1705            .expect("failed to create doc source");
1706        let sources = store
1707            .list_doc_sources("project", &["project::shared".to_string()])
1708            .await
1709            .expect("failed to list doc sources with scoped ingest id");
1710
1711        assert_eq!(sources.len(), 1);
1712        assert_eq!(sources[0].id.as_deref(), Some("source-1"));
1713    }
1714
1715    #[tokio::test]
1716    async fn list_doc_sources_by_project_accepts_scoped_ingest_filter() {
1717        let store = build_store().await;
1718        let source = DocSource {
1719            id: Some("source-1".to_string()),
1720            project_id: "project".to_string(),
1721            ingest_id: Some("shared".to_string()),
1722            language: None,
1723            source_kind: None,
1724            path: None,
1725            tool_version: None,
1726            hash: None,
1727            source_modified_at: None,
1728            extra: None,
1729        };
1730
1731        store
1732            .create_doc_source(source)
1733            .await
1734            .expect("failed to create doc source");
1735        let sources = store
1736            .list_doc_sources_by_project("project", Some("project::shared"), 10)
1737            .await
1738            .expect("failed to list doc sources by project with scoped ingest id");
1739
1740        assert_eq!(sources.len(), 1);
1741        assert_eq!(sources[0].id.as_deref(), Some("source-1"));
1742    }
1743
1744    #[tokio::test]
1745    async fn list_doc_sources_by_ids_filters_requested_ids() {
1746        let store = build_store().await;
1747        let source_left = DocSource {
1748            id: Some("source-left".to_string()),
1749            project_id: "project".to_string(),
1750            ingest_id: Some("ingest-1".to_string()),
1751            language: None,
1752            source_kind: None,
1753            path: None,
1754            tool_version: None,
1755            hash: None,
1756            source_modified_at: None,
1757            extra: None,
1758        };
1759        let source_right = DocSource {
1760            id: Some("source-right".to_string()),
1761            project_id: "project".to_string(),
1762            ingest_id: Some("ingest-1".to_string()),
1763            language: None,
1764            source_kind: None,
1765            path: None,
1766            tool_version: None,
1767            hash: None,
1768            source_modified_at: None,
1769            extra: None,
1770        };
1771
1772        store
1773            .create_doc_source(source_left)
1774            .await
1775            .expect("failed to create left doc source");
1776        store
1777            .create_doc_source(source_right)
1778            .await
1779            .expect("failed to create right doc source");
1780
1781        let results = store
1782            .list_doc_sources_by_ids("project", &["source-right".to_string()])
1783            .await
1784            .expect("failed to list doc sources by ids");
1785        assert_eq!(results.len(), 1);
1786        assert_eq!(results[0].id.as_deref(), Some("source-right"));
1787    }
1788
1789    fn build_symbol(project_id: &str, id: &str) -> Symbol {
1790        Symbol {
1791            id: Some(id.to_string()),
1792            project_id: project_id.to_string(),
1793            language: Some("rust".to_string()),
1794            symbol_key: id.to_string(),
1795            kind: None,
1796            name: None,
1797            qualified_name: None,
1798            display_name: None,
1799            signature: None,
1800            signature_hash: None,
1801            visibility: None,
1802            is_static: None,
1803            is_async: None,
1804            is_const: None,
1805            is_deprecated: None,
1806            since: None,
1807            stability: None,
1808            source_path: None,
1809            line: None,
1810            col: None,
1811            return_type: None,
1812            params: Vec::new(),
1813            type_params: Vec::new(),
1814            attributes: Vec::new(),
1815            source_ids: Vec::new(),
1816            doc_summary: None,
1817            extra: None,
1818        }
1819    }
1820
1821    #[derive(Deserialize, SurrealValue)]
1822    struct RelationTypeFlags {
1823        in_is_record: bool,
1824        out_is_record: bool,
1825    }
1826
1827    #[tokio::test]
1828    async fn create_relation_stores_record_links() {
1829        let store = build_store().await;
1830        let _ = store
1831            .upsert_symbol(build_symbol("project", "left"))
1832            .await
1833            .expect("failed to create left symbol");
1834        let _ = store
1835            .upsert_symbol(build_symbol("project", "right"))
1836            .await
1837            .expect("failed to create right symbol");
1838
1839        let relation = RelationRecord {
1840            id: None,
1841            in_id: "symbol:left".to_string(),
1842            out_id: "symbol:right".to_string(),
1843            project_id: "project".to_string(),
1844            ingest_id: None,
1845            kind: Some("test".to_string()),
1846            extra: None,
1847        };
1848        let _ = store
1849            .create_relation(REL_MEMBER_OF, relation)
1850            .await
1851            .expect("failed to create relation");
1852
1853        let mut response = store
1854            .db()
1855            .query(
1856                "SELECT type::is_record(in) AS in_is_record, type::is_record(out) AS out_is_record FROM member_of LIMIT 1;",
1857            )
1858            .await
1859            .expect("failed to query relation record types");
1860        let rows: Vec<RelationTypeFlags> = response.take(0).expect("failed to decode relation row");
1861        assert_eq!(rows.len(), 1);
1862        assert!(rows[0].in_is_record);
1863        assert!(rows[0].out_is_record);
1864    }
1865
1866    #[tokio::test]
1867    async fn search_symbols_advanced_supports_exact_symbol_key() {
1868        let store = build_store().await;
1869        let mut alpha = build_symbol("project", "rust|project|alpha");
1870        alpha.name = Some("alpha".to_string());
1871        alpha.qualified_name = Some("crate::alpha".to_string());
1872        let mut beta = build_symbol("project", "rust|project|beta");
1873        beta.name = Some("beta".to_string());
1874        beta.qualified_name = Some("crate::beta".to_string());
1875
1876        store
1877            .upsert_symbol(alpha.clone())
1878            .await
1879            .expect("failed to create alpha");
1880        store
1881            .upsert_symbol(beta)
1882            .await
1883            .expect("failed to create beta");
1884
1885        let results = store
1886            .search_symbols_advanced(
1887                "project",
1888                None,
1889                None,
1890                Some(alpha.symbol_key.as_str()),
1891                None,
1892                10,
1893            )
1894            .await
1895            .expect("advanced search by symbol key should succeed");
1896        assert_eq!(results.len(), 1);
1897        assert_eq!(results[0].symbol_key, alpha.symbol_key);
1898    }
1899
1900    #[tokio::test]
1901    async fn remove_database_makes_current_db_unavailable() {
1902        let store = build_store().await;
1903        let _ = store
1904            .upsert_project(Project {
1905                id: Some("project".to_string()),
1906                project_id: "project".to_string(),
1907                name: Some("project".to_string()),
1908                language: Some("rust".to_string()),
1909                root_path: None,
1910                description: None,
1911                aliases: Vec::new(),
1912                search_text: Some("project".to_string()),
1913                extra: None,
1914            })
1915            .await
1916            .expect("failed to upsert project");
1917
1918        store
1919            .remove_database("test")
1920            .await
1921            .expect("failed to remove database");
1922        let result = store.list_projects(10).await;
1923        assert!(result.is_err());
1924    }
1925}