Skip to main content

brainwires_knowledge/knowledge/
brain_client.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use anyhow::{Context, Result};
5use chrono::Utc;
6use tracing;
7
8use crate::knowledge::bks_pks::{
9    BehavioralKnowledgeCache, PersonalFactCollector, PersonalKnowledgeCache,
10};
11use brainwires_storage::{
12    EmbeddingProvider, FieldDef, FieldType, FieldValue, Filter, Record, StorageBackend, record_get,
13};
14
15#[cfg(feature = "knowledge")]
16use brainwires_storage::LanceDatabase;
17
18use crate::knowledge::config::MemoryBankConfig;
19use crate::knowledge::fact_extractor;
20use crate::knowledge::thought::{Thought, ThoughtCategory, ThoughtSource};
21use crate::knowledge::types::*;
22
23/// Central orchestrator for all Open Brain storage operations.
24pub struct BrainClient {
25    backend: Arc<dyn StorageBackend>,
26    embeddings: Arc<EmbeddingProvider>,
27    pks_cache: PersonalKnowledgeCache,
28    bks_cache: BehavioralKnowledgeCache,
29    fact_collector: PersonalFactCollector,
30    /// Optional memory bank configuration (mission, directives, disposition).
31    config: MemoryBankConfig,
32}
33
34const THOUGHTS_TABLE: &str = "thoughts";
35
36/// EMA alpha for confidence updates on corroboration/contradiction.
37const EVIDENCE_EMA_ALPHA: f32 = 0.3;
38/// Score threshold above which a similar thought is a corroboration.
39const CORROBORATION_THRESHOLD: f32 = 0.85;
40/// Score threshold above which a similar thought may be a contradiction.
41const CONTRADICTION_THRESHOLD: f32 = 0.70;
42
43impl BrainClient {
44    /// Create a new BrainClient with default paths.
45    ///
46    /// - LanceDB: `~/.brainwires/brain/`
47    /// - PKS:     `~/.brainwires/pks.db`
48    /// - BKS:     `~/.brainwires/bks.db`
49    pub async fn new() -> Result<Self> {
50        let base = dirs::home_dir()
51            .context("Cannot determine home directory")?
52            .join(".brainwires");
53
54        std::fs::create_dir_all(&base)?;
55
56        let lance_path = base.join("brain");
57        let pks_path = base.join("pks.db");
58        let bks_path = base.join("bks.db");
59
60        Self::with_paths(
61            lance_path
62                .to_str()
63                .context("lance path is not valid UTF-8")?,
64            pks_path.to_str().context("pks path is not valid UTF-8")?,
65            bks_path.to_str().context("bks path is not valid UTF-8")?,
66        )
67        .await
68    }
69
70    /// Create with explicit paths (useful for testing).
71    ///
72    /// Creates a LanceDatabase internally as the default backend.
73    pub async fn with_paths(lance_path: &str, pks_path: &str, bks_path: &str) -> Result<Self> {
74        let embeddings = Arc::new(EmbeddingProvider::new()?);
75        let backend: Arc<dyn StorageBackend> = Arc::new(LanceDatabase::new(lance_path).await?);
76
77        Self::with_backend(backend, embeddings, pks_path, bks_path).await
78    }
79
80    /// Create with an externally-provided storage backend.
81    ///
82    /// This is the primary constructor for dependency injection — any
83    /// [`StorageBackend`] implementation can be used (LanceDB, Postgres, etc.).
84    pub async fn with_backend(
85        backend: Arc<dyn StorageBackend>,
86        embeddings: Arc<EmbeddingProvider>,
87        pks_path: &str,
88        bks_path: &str,
89    ) -> Result<Self> {
90        // Ensure the thoughts table exists
91        Self::ensure_thoughts_table(&*backend, embeddings.dimension()).await?;
92
93        let pks_cache = PersonalKnowledgeCache::new(pks_path, 1000)?;
94        let bks_cache = BehavioralKnowledgeCache::new(bks_path, 1000)?;
95        let fact_collector = PersonalFactCollector::default();
96
97        Ok(Self {
98            backend,
99            embeddings,
100            pks_cache,
101            bks_cache,
102            fact_collector,
103            config: MemoryBankConfig::default(),
104        })
105    }
106
107    /// Create with default paths and a custom [`MemoryBankConfig`].
108    pub async fn with_bank_config(config: MemoryBankConfig) -> Result<Self> {
109        let mut client = Self::new().await?;
110        client.config = config;
111        Ok(client)
112    }
113
114    /// Set or replace the [`MemoryBankConfig`] on an existing client.
115    pub fn set_config(&mut self, config: MemoryBankConfig) {
116        self.config = config;
117    }
118
119    /// Return a reference to the active [`MemoryBankConfig`].
120    pub fn config(&self) -> &MemoryBankConfig {
121        &self.config
122    }
123
124    // ── Table management ─────────────────────────────────────────────────
125
126    async fn ensure_thoughts_table(backend: &dyn StorageBackend, dim: usize) -> Result<()> {
127        backend
128            .ensure_table(
129                THOUGHTS_TABLE,
130                &[
131                    FieldDef::required("vector", FieldType::Vector(dim)),
132                    FieldDef::required("id", FieldType::Utf8),
133                    FieldDef::required("content", FieldType::Utf8),
134                    FieldDef::required("category", FieldType::Utf8),
135                    FieldDef::required("tags", FieldType::Utf8),
136                    FieldDef::required("source", FieldType::Utf8),
137                    FieldDef::required("importance", FieldType::Float32),
138                    FieldDef::required("created_at", FieldType::Int64),
139                    FieldDef::required("updated_at", FieldType::Int64),
140                    FieldDef::required("deleted", FieldType::Boolean),
141                    FieldDef::optional("confidence", FieldType::Float32),
142                    FieldDef::optional("evidence_chain", FieldType::Utf8),
143                    FieldDef::optional("reinforcement_count", FieldType::Int64),
144                    FieldDef::optional("contradiction_count", FieldType::Int64),
145                ],
146            )
147            .await
148            .context("Failed to create thoughts table")?;
149
150        tracing::info!("Ensured thoughts table exists");
151        Ok(())
152    }
153
154    // ── Helpers ─────────────────────────────────────────────────────────
155
156    /// Default importance score based on thought category.
157    fn importance_for_category(cat: &ThoughtCategory) -> f32 {
158        match cat {
159            ThoughtCategory::Decision => 0.85,
160            ThoughtCategory::ActionItem => 0.8,
161            ThoughtCategory::Insight => 0.75,
162            ThoughtCategory::Idea => 0.65,
163            ThoughtCategory::Person => 0.6,
164            ThoughtCategory::MeetingNote => 0.55,
165            ThoughtCategory::Reference => 0.5,
166            ThoughtCategory::Conversation => 0.45,
167            ThoughtCategory::General => 0.4,
168        }
169    }
170
171    // ── Capture ──────────────────────────────────────────────────────────
172
173    /// Capture a new thought, embed it, detect category, extract PKS facts.
174    pub async fn capture_thought(
175        &mut self,
176        req: CaptureThoughtRequest,
177    ) -> Result<CaptureThoughtResponse> {
178        // Build the Thought
179        let category = match &req.category {
180            Some(c) => ThoughtCategory::parse(c),
181            None => fact_extractor::detect_category(&req.content),
182        };
183
184        let mut auto_tags = fact_extractor::extract_tags(&req.content);
185        if let Some(ref user_tags) = req.tags {
186            for t in user_tags {
187                let lower = t.to_lowercase();
188                if !auto_tags.contains(&lower) {
189                    auto_tags.push(lower);
190                }
191            }
192        }
193
194        // Auto-tag with the mission slug when a mission is configured.
195        if let Some(mission_tag) = self.config.mission_tag()
196            && !auto_tags.contains(&mission_tag)
197        {
198            auto_tags.push(mission_tag);
199        }
200
201        let source = req
202            .source
203            .as_deref()
204            .map(ThoughtSource::parse)
205            .unwrap_or(ThoughtSource::ManualCapture);
206
207        let thought = Thought::new(req.content.clone())
208            .with_category(category)
209            .with_tags(auto_tags.clone())
210            .with_source(source)
211            .with_importance(
212                req.importance
213                    .unwrap_or_else(|| Self::importance_for_category(&category)),
214            );
215
216        // Embed
217        let embedding = self.embeddings.embed(&thought.content)?;
218
219        // Store via backend
220        let record = Self::thought_to_record(&thought, &embedding);
221        self.backend
222            .insert(THOUGHTS_TABLE, vec![record])
223            .await
224            .context("Failed to store thought")?;
225
226        // Extract PKS facts
227        let facts = self.fact_collector.process_message(&req.content);
228        let facts_count = facts.len();
229        for fact in facts {
230            if let Err(e) = self.pks_cache.upsert_fact(fact) {
231                tracing::warn!("Failed to upsert PKS fact: {}", e);
232            }
233        }
234
235        // Run evidence check: find corroborations / contradictions among existing thoughts.
236        let evidence = self
237            .apply_evidence_check(&thought.id, &req.content)
238            .await
239            .unwrap_or_default();
240
241        // Compute initial confidence for the new thought based on corroboration count.
242        let initial_confidence = (0.5 + 0.05 * evidence.corroborations.len() as f32
243            - 0.05 * evidence.contradictions.len() as f32)
244            .clamp(0.0, 1.0);
245
246        // Persist updated confidence + evidence_chain for the new thought itself.
247        if !evidence.corroborations.is_empty() || !evidence.contradictions.is_empty() {
248            let mut all_evidence = evidence.corroborations.clone();
249            all_evidence.extend(evidence.contradictions.iter().cloned());
250
251            // Update the newly inserted thought record with its evidence data.
252            let delete_filter = Filter::Eq("id".into(), FieldValue::Utf8(Some(thought.id.clone())));
253            let _ = self.backend.delete(THOUGHTS_TABLE, &delete_filter).await;
254            let mut updated_thought = thought.clone();
255            updated_thought.confidence = initial_confidence;
256            updated_thought.evidence_chain = all_evidence;
257            let embedding = self.embeddings.embed_cached(&updated_thought.content)?;
258            let record = Self::thought_to_record(&updated_thought, &embedding);
259            let _ = self.backend.insert(THOUGHTS_TABLE, vec![record]).await;
260        }
261
262        tracing::info!(
263            id = %thought.id,
264            category = %category,
265            facts = facts_count,
266            corroborations = evidence.corroborations.len(),
267            contradictions = evidence.contradictions.len(),
268            "Captured thought"
269        );
270
271        Ok(CaptureThoughtResponse {
272            id: thought.id,
273            category: category.to_string(),
274            tags: auto_tags,
275            importance: thought.importance,
276            facts_extracted: facts_count,
277            corroborations: evidence.corroborations,
278            contradictions: evidence.contradictions,
279            confidence: initial_confidence,
280        })
281    }
282
283    /// Batch-capture multiple thoughts in a single embed + insert pass.
284    ///
285    /// Skips per-message evidence checks for speed. PKS extraction still runs per message.
286    /// Returns the number of thoughts stored.
287    pub async fn capture_thoughts_batch(
288        &mut self,
289        requests: Vec<CaptureThoughtRequest>,
290    ) -> Result<usize> {
291        if requests.is_empty() {
292            return Ok(0);
293        }
294
295        // Build Thought objects with category detection + importance
296        let thoughts: Vec<Thought> = requests
297            .iter()
298            .map(|req| {
299                let category = match &req.category {
300                    Some(c) => ThoughtCategory::parse(c),
301                    None => fact_extractor::detect_category(&req.content),
302                };
303                let mut auto_tags = fact_extractor::extract_tags(&req.content);
304                if let Some(ref user_tags) = req.tags {
305                    for t in user_tags {
306                        if !auto_tags.contains(t) {
307                            auto_tags.push(t.clone());
308                        }
309                    }
310                }
311                let source = req
312                    .source
313                    .as_deref()
314                    .map(ThoughtSource::parse)
315                    .unwrap_or(ThoughtSource::ManualCapture);
316                Thought::new(req.content.clone())
317                    .with_category(category)
318                    .with_tags(auto_tags)
319                    .with_source(source)
320                    .with_importance(
321                        req.importance
322                            .unwrap_or_else(|| Self::importance_for_category(&category)),
323                    )
324            })
325            .collect();
326
327        // Single batch embed
328        let contents: Vec<String> = thoughts.iter().map(|t| t.content.clone()).collect();
329        let embeddings = self.embeddings.embed_batch(&contents)?;
330
331        // Build records
332        let records: Vec<Record> = thoughts
333            .iter()
334            .zip(embeddings.iter())
335            .map(|(thought, emb)| Self::thought_to_record(thought, emb))
336            .collect();
337
338        let count = records.len();
339        self.backend
340            .insert(THOUGHTS_TABLE, records)
341            .await
342            .context("Failed to batch-store thoughts")?;
343
344        // PKS extraction per message (sync, fast)
345        for req in &requests {
346            let facts = self.fact_collector.process_message(&req.content);
347            for fact in facts {
348                if let Err(e) = self.pks_cache.upsert_fact(fact) {
349                    tracing::warn!("Failed to upsert PKS fact: {}", e);
350                }
351            }
352        }
353
354        tracing::info!("Batch-captured {} thoughts", count);
355        Ok(count)
356    }
357
358    // ── Search (semantic) ────────────────────────────────────────────────
359
360    /// Semantic search across thoughts and optionally PKS facts.
361    pub async fn search_memory(&self, req: SearchMemoryRequest) -> Result<SearchMemoryResponse> {
362        let search_thoughts = req
363            .sources
364            .as_ref()
365            .is_none_or(|s| s.iter().any(|x| x == "thoughts"));
366        let search_facts = req
367            .sources
368            .as_ref()
369            .is_none_or(|s| s.iter().any(|x| x == "facts"));
370
371        let mut results = Vec::new();
372
373        // 1. Thought vector search
374        if search_thoughts {
375            let query_embedding = self.embeddings.embed_cached(&req.query)?;
376
377            // Build filter: deleted = false, optional category
378            let mut filters = vec![Filter::Eq(
379                "deleted".into(),
380                FieldValue::Boolean(Some(false)),
381            )];
382
383            if let Some(ref cat) = req.category {
384                let cat_str = ThoughtCategory::parse(cat).as_str().to_string();
385                filters.push(Filter::Eq(
386                    "category".into(),
387                    FieldValue::Utf8(Some(cat_str)),
388                ));
389            }
390
391            let filter = Filter::And(filters);
392
393            let scored_records = self
394                .backend
395                .vector_search(
396                    THOUGHTS_TABLE,
397                    "vector",
398                    query_embedding,
399                    req.limit,
400                    Some(&filter),
401                )
402                .await?;
403
404            for sr in scored_records {
405                let score = sr.score;
406                if score >= req.min_score {
407                    let thought = Self::record_to_thought(&sr.record)?;
408                    results.push(MemorySearchResult {
409                        content: thought.content,
410                        score,
411                        source: "thoughts".into(),
412                        thought_id: Some(thought.id),
413                        category: Some(thought.category.to_string()),
414                        tags: Some(thought.tags),
415                        created_at: Some(thought.created_at),
416                    });
417                }
418            }
419        }
420
421        // 2. PKS keyword search
422        if search_facts {
423            let pks_results = self.pks_cache.search_facts(&req.query);
424            for fact in pks_results {
425                let score = 0.7; // Flat relevance for keyword matches
426                if score >= req.min_score {
427                    results.push(MemorySearchResult {
428                        content: format!("{}: {}", fact.key, fact.value),
429                        score,
430                        source: "facts".into(),
431                        thought_id: None,
432                        category: Some(format!("{:?}", fact.category)),
433                        tags: None,
434                        created_at: Some(fact.created_at),
435                    });
436                }
437            }
438        }
439
440        // Apply memory bank config: directive filtering + disposition scoring.
441        if !self.config.is_noop() {
442            results.retain(|r| !self.config.blocks_content(&r.content));
443            for r in &mut results {
444                let delta = self.config.disposition_score_delta(&r.content);
445                r.score = (r.score + delta).clamp(0.0, 1.0);
446            }
447        }
448
449        // Sort by score descending
450        results.sort_by(|a, b| {
451            b.score
452                .partial_cmp(&a.score)
453                .unwrap_or(std::cmp::Ordering::Equal)
454        });
455        results.truncate(req.limit);
456
457        let total = results.len();
458        Ok(SearchMemoryResponse { results, total })
459    }
460
461    // ── List recent ──────────────────────────────────────────────────────
462
463    /// List recent thoughts, optionally filtered by category and time range.
464    pub async fn list_recent(&self, req: ListRecentRequest) -> Result<ListRecentResponse> {
465        let since_ts = match &req.since {
466            Some(s) => chrono::DateTime::parse_from_rfc3339(s)
467                .map(|dt| dt.timestamp())
468                .unwrap_or_else(|_| Utc::now().timestamp() - 7 * 86400),
469            None => Utc::now().timestamp() - 7 * 86400,
470        };
471
472        let mut filters = vec![
473            Filter::Eq("deleted".into(), FieldValue::Boolean(Some(false))),
474            Filter::Gte("created_at".into(), FieldValue::Int64(Some(since_ts))),
475        ];
476
477        if let Some(ref cat) = req.category {
478            let cat_str = ThoughtCategory::parse(cat).as_str().to_string();
479            filters.push(Filter::Eq(
480                "category".into(),
481                FieldValue::Utf8(Some(cat_str)),
482            ));
483        }
484
485        let filter = Filter::And(filters);
486
487        let records = self
488            .backend
489            .query(THOUGHTS_TABLE, Some(&filter), Some(req.limit))
490            .await?;
491
492        let mut thoughts = Self::records_to_thoughts(&records)?;
493        thoughts.sort_by(|a, b| b.created_at.cmp(&a.created_at));
494        thoughts.truncate(req.limit);
495
496        let total = thoughts.len();
497        let summaries = thoughts
498            .into_iter()
499            .map(|t| ThoughtSummary {
500                id: t.id,
501                content: t.content,
502                category: t.category.to_string(),
503                tags: t.tags,
504                importance: t.importance,
505                created_at: t.created_at,
506            })
507            .collect();
508
509        Ok(ListRecentResponse {
510            thoughts: summaries,
511            total,
512        })
513    }
514
515    /// Query thought content strings matching a filter. Used for deduplication.
516    pub async fn query_thought_contents(
517        &self,
518        filter: &Filter,
519        limit: usize,
520    ) -> Result<Vec<String>> {
521        let records = self
522            .backend
523            .query(THOUGHTS_TABLE, Some(filter), Some(limit))
524            .await?;
525        let thoughts = Self::records_to_thoughts(&records)?;
526        Ok(thoughts.into_iter().map(|t| t.content).collect())
527    }
528
529    // ── Get by ID ────────────────────────────────────────────────────────
530
531    /// Get a single thought by ID.
532    pub async fn get_thought(&self, id: &str) -> Result<Option<GetThoughtResponse>> {
533        let filter = Filter::And(vec![
534            Filter::Eq("id".into(), FieldValue::Utf8(Some(id.to_string()))),
535            Filter::Eq("deleted".into(), FieldValue::Boolean(Some(false))),
536        ]);
537
538        let records = self
539            .backend
540            .query(THOUGHTS_TABLE, Some(&filter), Some(1))
541            .await?;
542
543        let thoughts = Self::records_to_thoughts(&records)?;
544
545        Ok(thoughts.into_iter().next().map(|t| GetThoughtResponse {
546            id: t.id,
547            content: t.content,
548            category: t.category.to_string(),
549            tags: t.tags,
550            source: t.source.to_string(),
551            importance: t.importance,
552            created_at: t.created_at,
553            updated_at: t.updated_at,
554        }))
555    }
556
557    // ── Search knowledge (PKS/BKS) ──────────────────────────────────────
558
559    /// Search PKS and/or BKS knowledge stores.
560    pub fn search_knowledge(&self, req: SearchKnowledgeRequest) -> Result<SearchKnowledgeResponse> {
561        let search_pks = req
562            .source
563            .as_ref()
564            .is_none_or(|s| s == "all" || s == "personal");
565        let search_bks = req
566            .source
567            .as_ref()
568            .is_none_or(|s| s == "all" || s == "behavioral");
569
570        let mut results = Vec::new();
571
572        if search_pks {
573            let pks_results = self.pks_cache.search_facts(&req.query);
574            for fact in pks_results {
575                if fact.confidence >= req.min_confidence {
576                    results.push(KnowledgeResult {
577                        source: "personal".into(),
578                        category: format!("{:?}", fact.category),
579                        key: fact.key.clone(),
580                        value: fact.value.clone(),
581                        confidence: fact.confidence,
582                        context: fact.context.clone(),
583                    });
584                }
585            }
586        }
587
588        if search_bks {
589            let bks_results = self
590                .bks_cache
591                .get_matching_truths_with_scores(&req.query, req.min_confidence, req.limit)
592                .unwrap_or_default();
593            for (truth, score) in bks_results {
594                results.push(KnowledgeResult {
595                    source: "behavioral".into(),
596                    category: format!("{:?}", truth.category),
597                    key: truth.context_pattern.clone(),
598                    value: truth.rule.clone(),
599                    confidence: score,
600                    context: Some(truth.rationale.clone()),
601                });
602            }
603        }
604
605        results.sort_by(|a, b| {
606            b.confidence
607                .partial_cmp(&a.confidence)
608                .unwrap_or(std::cmp::Ordering::Equal)
609        });
610        results.truncate(req.limit);
611
612        let total = results.len();
613        Ok(SearchKnowledgeResponse { results, total })
614    }
615
616    // ── Stats ────────────────────────────────────────────────────────────
617
618    /// Get aggregate statistics across all memory stores.
619    pub async fn memory_stats(&self) -> Result<MemoryStatsResponse> {
620        let now = Utc::now().timestamp();
621        let one_day = 86_400i64;
622
623        // Thought stats: query all non-deleted
624        let filter = Filter::Eq("deleted".into(), FieldValue::Boolean(Some(false)));
625        let records = self
626            .backend
627            .query(THOUGHTS_TABLE, Some(&filter), None)
628            .await?;
629        let all_thoughts = Self::records_to_thoughts(&records)?;
630
631        let total = all_thoughts.len();
632        let mut by_category: HashMap<String, usize> = HashMap::new();
633        let mut tag_counts: HashMap<String, usize> = HashMap::new();
634        let mut recent_24h = 0usize;
635        let mut recent_7d = 0usize;
636        let mut recent_30d = 0usize;
637
638        for t in &all_thoughts {
639            *by_category.entry(t.category.to_string()).or_insert(0) += 1;
640            for tag in &t.tags {
641                *tag_counts.entry(tag.clone()).or_insert(0) += 1;
642            }
643            let age = now - t.created_at;
644            if age <= one_day {
645                recent_24h += 1;
646            }
647            if age <= 7 * one_day {
648                recent_7d += 1;
649            }
650            if age <= 30 * one_day {
651                recent_30d += 1;
652            }
653        }
654
655        let mut top_tags: Vec<(String, usize)> = tag_counts.into_iter().collect();
656        top_tags.sort_by(|a, b| b.1.cmp(&a.1));
657        top_tags.truncate(10);
658
659        // PKS stats
660        let pks_stats_raw = self.pks_cache.stats();
661        let pks_by_cat: HashMap<String, u32> = pks_stats_raw
662            .by_category
663            .into_iter()
664            .map(|(k, v)| (format!("{:?}", k), v))
665            .collect();
666
667        // BKS stats
668        let bks_stats_raw = self.bks_cache.stats();
669        let bks_by_cat: HashMap<String, u32> = bks_stats_raw
670            .by_category
671            .into_iter()
672            .map(|(k, v)| (format!("{:?}", k), v))
673            .collect();
674
675        Ok(MemoryStatsResponse {
676            thoughts: ThoughtStats {
677                total,
678                by_category,
679                recent_24h,
680                recent_7d,
681                recent_30d,
682                top_tags,
683            },
684            pks: PksStats {
685                total_facts: pks_stats_raw.total_facts,
686                by_category: pks_by_cat,
687                avg_confidence: pks_stats_raw.avg_confidence,
688            },
689            bks: BksStats {
690                total_truths: bks_stats_raw.total_truths,
691                by_category: bks_by_cat,
692            },
693        })
694    }
695
696    // ── Delete ───────────────────────────────────────────────────────────
697
698    /// Soft-delete a thought by ID.
699    pub async fn delete_thought(&self, id: &str) -> Result<DeleteThoughtResponse> {
700        // Check existence
701        let filter = Filter::And(vec![
702            Filter::Eq("id".into(), FieldValue::Utf8(Some(id.to_string()))),
703            Filter::Eq("deleted".into(), FieldValue::Boolean(Some(false))),
704        ]);
705
706        let count = self.backend.count(THOUGHTS_TABLE, Some(&filter)).await?;
707        if count == 0 {
708            return Ok(DeleteThoughtResponse {
709                deleted: false,
710                id: id.to_string(),
711            });
712        }
713
714        // Delete the row via backend
715        let delete_filter = Filter::Eq("id".into(), FieldValue::Utf8(Some(id.to_string())));
716        self.backend.delete(THOUGHTS_TABLE, &delete_filter).await?;
717
718        tracing::info!(id = id, "Deleted thought");
719        Ok(DeleteThoughtResponse {
720            deleted: true,
721            id: id.to_string(),
722        })
723    }
724
725    /// Add a behavioral truth to the BKS.
726    pub fn add_behavioral_truth(
727        &mut self,
728        truth: crate::knowledge::bks_pks::BehavioralTruth,
729    ) -> Result<()> {
730        self.bks_cache.add_truth(truth)?;
731        Ok(())
732    }
733
734    /// Delete all thoughts matching a filter. Returns count deleted.
735    pub async fn delete_by_filter(&self, filter: &Filter) -> Result<usize> {
736        let count = self.backend.count(THOUGHTS_TABLE, Some(filter)).await?;
737        if count > 0 {
738            self.backend.delete(THOUGHTS_TABLE, filter).await?;
739            tracing::info!("Deleted {} thoughts by filter", count);
740        }
741        Ok(count)
742    }
743
744    // ── Record conversion ────────────────────────────────────────────────
745
746    fn thought_to_record(thought: &Thought, embedding: &[f32]) -> Record {
747        let tags_json = serde_json::to_string(&thought.tags).unwrap_or_else(|_| "[]".into());
748        let evidence_json =
749            serde_json::to_string(&thought.evidence_chain).unwrap_or_else(|_| "[]".into());
750
751        vec![
752            ("vector".into(), FieldValue::Vector(embedding.to_vec())),
753            ("id".into(), FieldValue::Utf8(Some(thought.id.clone()))),
754            (
755                "content".into(),
756                FieldValue::Utf8(Some(thought.content.clone())),
757            ),
758            (
759                "category".into(),
760                FieldValue::Utf8(Some(thought.category.as_str().to_string())),
761            ),
762            ("tags".into(), FieldValue::Utf8(Some(tags_json))),
763            (
764                "source".into(),
765                FieldValue::Utf8(Some(thought.source.as_str().to_string())),
766            ),
767            (
768                "importance".into(),
769                FieldValue::Float32(Some(thought.importance)),
770            ),
771            (
772                "created_at".into(),
773                FieldValue::Int64(Some(thought.created_at)),
774            ),
775            (
776                "updated_at".into(),
777                FieldValue::Int64(Some(thought.updated_at)),
778            ),
779            ("deleted".into(), FieldValue::Boolean(Some(thought.deleted))),
780            (
781                "confidence".into(),
782                FieldValue::Float32(Some(thought.confidence)),
783            ),
784            (
785                "evidence_chain".into(),
786                FieldValue::Utf8(Some(evidence_json)),
787            ),
788            (
789                "reinforcement_count".into(),
790                FieldValue::Int64(Some(thought.reinforcement_count as i64)),
791            ),
792            (
793                "contradiction_count".into(),
794                FieldValue::Int64(Some(thought.contradiction_count as i64)),
795            ),
796        ]
797    }
798
799    fn record_to_thought(record: &Record) -> Result<Thought> {
800        let id = record_get(record, "id")
801            .and_then(|v| v.as_str())
802            .context("Missing id field")?
803            .to_string();
804        let content = record_get(record, "content")
805            .and_then(|v| v.as_str())
806            .context("Missing content field")?
807            .to_string();
808        let category = record_get(record, "category")
809            .and_then(|v| v.as_str())
810            .map(ThoughtCategory::parse)
811            .context("Missing category field")?;
812        let tags_str = record_get(record, "tags")
813            .and_then(|v| v.as_str())
814            .unwrap_or("[]");
815        let tags: Vec<String> = serde_json::from_str(tags_str).unwrap_or_default();
816        let source = record_get(record, "source")
817            .and_then(|v| v.as_str())
818            .map(ThoughtSource::parse)
819            .context("Missing source field")?;
820        let importance = record_get(record, "importance")
821            .and_then(|v| v.as_f32())
822            .context("Missing importance field")?;
823        let created_at = record_get(record, "created_at")
824            .and_then(|v| v.as_i64())
825            .context("Missing created_at field")?;
826        let updated_at = record_get(record, "updated_at")
827            .and_then(|v| v.as_i64())
828            .context("Missing updated_at field")?;
829        let deleted = record_get(record, "deleted")
830            .and_then(|v| v.as_bool())
831            .unwrap_or(false);
832        let confidence = record_get(record, "confidence")
833            .and_then(|v| v.as_f32())
834            .unwrap_or(0.5);
835        let evidence_str = record_get(record, "evidence_chain")
836            .and_then(|v| v.as_str())
837            .unwrap_or("[]");
838        let evidence_chain: Vec<String> = serde_json::from_str(evidence_str).unwrap_or_default();
839        let reinforcement_count = record_get(record, "reinforcement_count")
840            .and_then(|v| v.as_i64())
841            .unwrap_or(0) as u32;
842        let contradiction_count = record_get(record, "contradiction_count")
843            .and_then(|v| v.as_i64())
844            .unwrap_or(0) as u32;
845
846        Ok(Thought {
847            id,
848            content,
849            category,
850            tags,
851            source,
852            importance,
853            created_at,
854            updated_at,
855            deleted,
856            confidence,
857            evidence_chain,
858            reinforcement_count,
859            contradiction_count,
860        })
861    }
862
863    fn records_to_thoughts(records: &[Record]) -> Result<Vec<Thought>> {
864        records.iter().map(Self::record_to_thought).collect()
865    }
866
867    /// Persist an updated `Thought` by deleting the old record and reinserting.
868    ///
869    /// Required because `StorageBackend` has no `update` method.
870    async fn replace_thought(&self, thought: &Thought) -> Result<()> {
871        let delete_filter = Filter::Eq("id".into(), FieldValue::Utf8(Some(thought.id.clone())));
872        self.backend.delete(THOUGHTS_TABLE, &delete_filter).await?;
873        let embedding = self.embeddings.embed_cached(&thought.content)?;
874        let record = Self::thought_to_record(thought, &embedding);
875        self.backend
876            .insert(THOUGHTS_TABLE, vec![record])
877            .await
878            .context("Failed to reinsert updated thought")?;
879        Ok(())
880    }
881
882    /// Search for existing thoughts similar to `content`, classify them as
883    /// corroborations or contradictions, update their confidence via EMA, and
884    /// add bidirectional `evidence_chain` links.
885    ///
886    /// Returns an [`EvidenceCheckResult`] describing what was found.
887    async fn apply_evidence_check(
888        &self,
889        new_thought_id: &str,
890        content: &str,
891    ) -> Result<crate::knowledge::types::EvidenceCheckResult> {
892        use crate::knowledge::types::SearchMemoryRequest;
893
894        // Search for semantically similar existing thoughts.
895        let similar = self
896            .search_memory(SearchMemoryRequest {
897                query: content.to_string(),
898                limit: 10,
899                min_score: CONTRADICTION_THRESHOLD,
900                category: None,
901                sources: Some(vec!["thoughts".into()]),
902            })
903            .await?;
904
905        // Exclude the newly inserted thought itself.
906        let similar_results: Vec<_> = similar
907            .results
908            .into_iter()
909            .filter(|r| r.thought_id.as_deref() != Some(new_thought_id))
910            .collect();
911
912        let corroboration_result =
913            fact_extractor::check_corroboration(&similar_results, CORROBORATION_THRESHOLD);
914        let contradictions =
915            fact_extractor::check_contradiction(content, &similar_results, CONTRADICTION_THRESHOLD);
916
917        // Remove IDs that appear in both (corroboration wins).
918        let contradiction_ids: Vec<String> = contradictions
919            .into_iter()
920            .filter(|id| !corroboration_result.corroborations.contains(id))
921            .collect();
922
923        let now = Utc::now().timestamp();
924
925        // Update corroborated thoughts.
926        for corr_id in &corroboration_result.corroborations {
927            if let Some(mut t) = self.get_thought_internal(corr_id).await? {
928                let old_conf = t.confidence;
929                t.confidence = (EVIDENCE_EMA_ALPHA * (old_conf + 0.1)
930                    + (1.0 - EVIDENCE_EMA_ALPHA) * old_conf)
931                    .clamp(0.0, 1.0);
932                t.reinforcement_count += 1;
933                if !t.evidence_chain.contains(&new_thought_id.to_string()) {
934                    t.evidence_chain.push(new_thought_id.to_string());
935                }
936                t.updated_at = now;
937                if let Err(e) = self.replace_thought(&t).await {
938                    tracing::warn!(id = %corr_id, "Failed to update corroborated thought: {}", e);
939                }
940            }
941        }
942
943        // Update contradicted thoughts.
944        for contra_id in &contradiction_ids {
945            if let Some(mut t) = self.get_thought_internal(contra_id).await? {
946                let old_conf = t.confidence;
947                t.confidence = (EVIDENCE_EMA_ALPHA * (old_conf - 0.1)
948                    + (1.0 - EVIDENCE_EMA_ALPHA) * old_conf)
949                    .clamp(0.0, 1.0);
950                t.contradiction_count += 1;
951                if !t.evidence_chain.contains(&new_thought_id.to_string()) {
952                    t.evidence_chain.push(new_thought_id.to_string());
953                }
954                t.updated_at = now;
955                if let Err(e) = self.replace_thought(&t).await {
956                    tracing::warn!(id = %contra_id, "Failed to update contradicted thought: {}", e);
957                }
958            }
959        }
960
961        Ok(crate::knowledge::types::EvidenceCheckResult {
962            corroborations: corroboration_result.corroborations,
963            contradictions: contradiction_ids,
964        })
965    }
966
967    /// Fetch a full `Thought` by ID (including soft-deleted records are excluded).
968    async fn get_thought_internal(&self, id: &str) -> Result<Option<Thought>> {
969        let filter = Filter::And(vec![
970            Filter::Eq("id".into(), FieldValue::Utf8(Some(id.to_string()))),
971            Filter::Eq("deleted".into(), FieldValue::Boolean(Some(false))),
972        ]);
973        let records = self
974            .backend
975            .query(THOUGHTS_TABLE, Some(&filter), Some(1))
976            .await?;
977        let mut thoughts = Self::records_to_thoughts(&records)?;
978        Ok(thoughts.pop())
979    }
980}
981
982#[cfg(test)]
983mod tests {
984    use super::*;
985    use tempfile::TempDir;
986
987    async fn setup() -> (TempDir, BrainClient) {
988        let temp = TempDir::new().unwrap();
989        let lance_path = temp.path().join("brain.lance");
990        let pks_path = temp.path().join("pks.db");
991        let bks_path = temp.path().join("bks.db");
992
993        let client = BrainClient::with_paths(
994            lance_path.to_str().unwrap(),
995            pks_path.to_str().unwrap(),
996            bks_path.to_str().unwrap(),
997        )
998        .await
999        .unwrap();
1000
1001        (temp, client)
1002    }
1003
1004    #[tokio::test]
1005    async fn test_capture_and_get() {
1006        let (_temp, mut client) = setup().await;
1007
1008        let resp = client
1009            .capture_thought(CaptureThoughtRequest {
1010                content: "Decided to use PostgreSQL for auth service".into(),
1011                category: None,
1012                tags: Some(vec!["db".into()]),
1013                importance: Some(0.8),
1014                source: None,
1015            })
1016            .await
1017            .unwrap();
1018
1019        assert_eq!(resp.category, "decision");
1020        assert!(resp.tags.contains(&"db".to_string()));
1021
1022        let thought = client.get_thought(&resp.id).await.unwrap();
1023        assert!(thought.is_some());
1024        let t = thought.unwrap();
1025        assert_eq!(t.category, "decision");
1026    }
1027
1028    #[tokio::test]
1029    async fn test_search_memory() {
1030        let (_temp, mut client) = setup().await;
1031
1032        client
1033            .capture_thought(CaptureThoughtRequest {
1034                content: "Rust is great for systems programming".into(),
1035                category: Some("insight".into()),
1036                tags: None,
1037                importance: None,
1038                source: None,
1039            })
1040            .await
1041            .unwrap();
1042
1043        let results = client
1044            .search_memory(SearchMemoryRequest {
1045                query: "programming languages".into(),
1046                limit: 10,
1047                min_score: 0.0,
1048                category: None,
1049                sources: None,
1050            })
1051            .await
1052            .unwrap();
1053
1054        assert!(!results.results.is_empty());
1055    }
1056
1057    #[tokio::test]
1058    async fn test_delete_thought() {
1059        let (_temp, mut client) = setup().await;
1060
1061        let resp = client
1062            .capture_thought(CaptureThoughtRequest {
1063                content: "Something to delete".into(),
1064                category: None,
1065                tags: None,
1066                importance: None,
1067                source: None,
1068            })
1069            .await
1070            .unwrap();
1071
1072        let del = client.delete_thought(&resp.id).await.unwrap();
1073        assert!(del.deleted);
1074
1075        let thought = client.get_thought(&resp.id).await.unwrap();
1076        assert!(thought.is_none());
1077    }
1078
1079    #[tokio::test]
1080    async fn test_memory_stats() {
1081        let (_temp, client) = setup().await;
1082        let stats = client.memory_stats().await.unwrap();
1083        assert_eq!(stats.thoughts.total, 0);
1084    }
1085}