1use std::collections::HashMap;
2use std::sync::Arc;
3
4use anyhow::{Context, Result};
5use chrono::Utc;
6use tracing;
7
8use crate::knowledge::bks_pks::{
9 BehavioralKnowledgeCache, PersonalFactCollector, PersonalKnowledgeCache,
10};
11use brainwires_storage::{
12 EmbeddingProvider, FieldDef, FieldType, FieldValue, Filter, Record, StorageBackend, record_get,
13};
14
15#[cfg(feature = "knowledge")]
16use brainwires_storage::LanceDatabase;
17
18use crate::knowledge::config::MemoryBankConfig;
19use crate::knowledge::fact_extractor;
20use crate::knowledge::thought::{Thought, ThoughtCategory, ThoughtSource};
21use crate::knowledge::types::*;
22
23pub struct BrainClient {
25 backend: Arc<dyn StorageBackend>,
26 embeddings: Arc<EmbeddingProvider>,
27 pks_cache: PersonalKnowledgeCache,
28 bks_cache: BehavioralKnowledgeCache,
29 fact_collector: PersonalFactCollector,
30 config: MemoryBankConfig,
32}
33
34const THOUGHTS_TABLE: &str = "thoughts";
35
36const EVIDENCE_EMA_ALPHA: f32 = 0.3;
38const CORROBORATION_THRESHOLD: f32 = 0.85;
40const CONTRADICTION_THRESHOLD: f32 = 0.70;
42
43impl BrainClient {
44 pub async fn new() -> Result<Self> {
50 let base = dirs::home_dir()
51 .context("Cannot determine home directory")?
52 .join(".brainwires");
53
54 std::fs::create_dir_all(&base)?;
55
56 let lance_path = base.join("brain");
57 let pks_path = base.join("pks.db");
58 let bks_path = base.join("bks.db");
59
60 Self::with_paths(
61 lance_path
62 .to_str()
63 .context("lance path is not valid UTF-8")?,
64 pks_path.to_str().context("pks path is not valid UTF-8")?,
65 bks_path.to_str().context("bks path is not valid UTF-8")?,
66 )
67 .await
68 }
69
70 pub async fn with_paths(lance_path: &str, pks_path: &str, bks_path: &str) -> Result<Self> {
74 let embeddings = Arc::new(EmbeddingProvider::new()?);
75 let backend: Arc<dyn StorageBackend> = Arc::new(LanceDatabase::new(lance_path).await?);
76
77 Self::with_backend(backend, embeddings, pks_path, bks_path).await
78 }
79
80 pub async fn with_backend(
85 backend: Arc<dyn StorageBackend>,
86 embeddings: Arc<EmbeddingProvider>,
87 pks_path: &str,
88 bks_path: &str,
89 ) -> Result<Self> {
90 Self::ensure_thoughts_table(&*backend, embeddings.dimension()).await?;
92
93 let pks_cache = PersonalKnowledgeCache::new(pks_path, 1000)?;
94 let bks_cache = BehavioralKnowledgeCache::new(bks_path, 1000)?;
95 let fact_collector = PersonalFactCollector::default();
96
97 Ok(Self {
98 backend,
99 embeddings,
100 pks_cache,
101 bks_cache,
102 fact_collector,
103 config: MemoryBankConfig::default(),
104 })
105 }
106
107 pub async fn with_bank_config(config: MemoryBankConfig) -> Result<Self> {
109 let mut client = Self::new().await?;
110 client.config = config;
111 Ok(client)
112 }
113
114 pub fn set_config(&mut self, config: MemoryBankConfig) {
116 self.config = config;
117 }
118
119 pub fn config(&self) -> &MemoryBankConfig {
121 &self.config
122 }
123
124 async fn ensure_thoughts_table(backend: &dyn StorageBackend, dim: usize) -> Result<()> {
127 backend
128 .ensure_table(
129 THOUGHTS_TABLE,
130 &[
131 FieldDef::required("vector", FieldType::Vector(dim)),
132 FieldDef::required("id", FieldType::Utf8),
133 FieldDef::required("content", FieldType::Utf8),
134 FieldDef::required("category", FieldType::Utf8),
135 FieldDef::required("tags", FieldType::Utf8),
136 FieldDef::required("source", FieldType::Utf8),
137 FieldDef::required("importance", FieldType::Float32),
138 FieldDef::required("created_at", FieldType::Int64),
139 FieldDef::required("updated_at", FieldType::Int64),
140 FieldDef::required("deleted", FieldType::Boolean),
141 FieldDef::optional("confidence", FieldType::Float32),
142 FieldDef::optional("evidence_chain", FieldType::Utf8),
143 FieldDef::optional("reinforcement_count", FieldType::Int64),
144 FieldDef::optional("contradiction_count", FieldType::Int64),
145 ],
146 )
147 .await
148 .context("Failed to create thoughts table")?;
149
150 tracing::info!("Ensured thoughts table exists");
151 Ok(())
152 }
153
154 fn importance_for_category(cat: &ThoughtCategory) -> f32 {
158 match cat {
159 ThoughtCategory::Decision => 0.85,
160 ThoughtCategory::ActionItem => 0.8,
161 ThoughtCategory::Insight => 0.75,
162 ThoughtCategory::Idea => 0.65,
163 ThoughtCategory::Person => 0.6,
164 ThoughtCategory::MeetingNote => 0.55,
165 ThoughtCategory::Reference => 0.5,
166 ThoughtCategory::Conversation => 0.45,
167 ThoughtCategory::General => 0.4,
168 }
169 }
170
171 pub async fn capture_thought(
175 &mut self,
176 req: CaptureThoughtRequest,
177 ) -> Result<CaptureThoughtResponse> {
178 let category = match &req.category {
180 Some(c) => ThoughtCategory::parse(c),
181 None => fact_extractor::detect_category(&req.content),
182 };
183
184 let mut auto_tags = fact_extractor::extract_tags(&req.content);
185 if let Some(ref user_tags) = req.tags {
186 for t in user_tags {
187 let lower = t.to_lowercase();
188 if !auto_tags.contains(&lower) {
189 auto_tags.push(lower);
190 }
191 }
192 }
193
194 if let Some(mission_tag) = self.config.mission_tag()
196 && !auto_tags.contains(&mission_tag)
197 {
198 auto_tags.push(mission_tag);
199 }
200
201 let source = req
202 .source
203 .as_deref()
204 .map(ThoughtSource::parse)
205 .unwrap_or(ThoughtSource::ManualCapture);
206
207 let thought = Thought::new(req.content.clone())
208 .with_category(category)
209 .with_tags(auto_tags.clone())
210 .with_source(source)
211 .with_importance(
212 req.importance
213 .unwrap_or_else(|| Self::importance_for_category(&category)),
214 );
215
216 let embedding = self.embeddings.embed(&thought.content)?;
218
219 let record = Self::thought_to_record(&thought, &embedding);
221 self.backend
222 .insert(THOUGHTS_TABLE, vec![record])
223 .await
224 .context("Failed to store thought")?;
225
226 let facts = self.fact_collector.process_message(&req.content);
228 let facts_count = facts.len();
229 for fact in facts {
230 if let Err(e) = self.pks_cache.upsert_fact(fact) {
231 tracing::warn!("Failed to upsert PKS fact: {}", e);
232 }
233 }
234
235 let evidence = self
237 .apply_evidence_check(&thought.id, &req.content)
238 .await
239 .unwrap_or_default();
240
241 let initial_confidence = (0.5 + 0.05 * evidence.corroborations.len() as f32
243 - 0.05 * evidence.contradictions.len() as f32)
244 .clamp(0.0, 1.0);
245
246 if !evidence.corroborations.is_empty() || !evidence.contradictions.is_empty() {
248 let mut all_evidence = evidence.corroborations.clone();
249 all_evidence.extend(evidence.contradictions.iter().cloned());
250
251 let delete_filter = Filter::Eq("id".into(), FieldValue::Utf8(Some(thought.id.clone())));
253 let _ = self.backend.delete(THOUGHTS_TABLE, &delete_filter).await;
254 let mut updated_thought = thought.clone();
255 updated_thought.confidence = initial_confidence;
256 updated_thought.evidence_chain = all_evidence;
257 let embedding = self.embeddings.embed_cached(&updated_thought.content)?;
258 let record = Self::thought_to_record(&updated_thought, &embedding);
259 let _ = self.backend.insert(THOUGHTS_TABLE, vec![record]).await;
260 }
261
262 tracing::info!(
263 id = %thought.id,
264 category = %category,
265 facts = facts_count,
266 corroborations = evidence.corroborations.len(),
267 contradictions = evidence.contradictions.len(),
268 "Captured thought"
269 );
270
271 Ok(CaptureThoughtResponse {
272 id: thought.id,
273 category: category.to_string(),
274 tags: auto_tags,
275 importance: thought.importance,
276 facts_extracted: facts_count,
277 corroborations: evidence.corroborations,
278 contradictions: evidence.contradictions,
279 confidence: initial_confidence,
280 })
281 }
282
283 pub async fn capture_thoughts_batch(
288 &mut self,
289 requests: Vec<CaptureThoughtRequest>,
290 ) -> Result<usize> {
291 if requests.is_empty() {
292 return Ok(0);
293 }
294
295 let thoughts: Vec<Thought> = requests
297 .iter()
298 .map(|req| {
299 let category = match &req.category {
300 Some(c) => ThoughtCategory::parse(c),
301 None => fact_extractor::detect_category(&req.content),
302 };
303 let mut auto_tags = fact_extractor::extract_tags(&req.content);
304 if let Some(ref user_tags) = req.tags {
305 for t in user_tags {
306 if !auto_tags.contains(t) {
307 auto_tags.push(t.clone());
308 }
309 }
310 }
311 let source = req
312 .source
313 .as_deref()
314 .map(ThoughtSource::parse)
315 .unwrap_or(ThoughtSource::ManualCapture);
316 Thought::new(req.content.clone())
317 .with_category(category)
318 .with_tags(auto_tags)
319 .with_source(source)
320 .with_importance(
321 req.importance
322 .unwrap_or_else(|| Self::importance_for_category(&category)),
323 )
324 })
325 .collect();
326
327 let contents: Vec<String> = thoughts.iter().map(|t| t.content.clone()).collect();
329 let embeddings = self.embeddings.embed_batch(&contents)?;
330
331 let records: Vec<Record> = thoughts
333 .iter()
334 .zip(embeddings.iter())
335 .map(|(thought, emb)| Self::thought_to_record(thought, emb))
336 .collect();
337
338 let count = records.len();
339 self.backend
340 .insert(THOUGHTS_TABLE, records)
341 .await
342 .context("Failed to batch-store thoughts")?;
343
344 for req in &requests {
346 let facts = self.fact_collector.process_message(&req.content);
347 for fact in facts {
348 if let Err(e) = self.pks_cache.upsert_fact(fact) {
349 tracing::warn!("Failed to upsert PKS fact: {}", e);
350 }
351 }
352 }
353
354 tracing::info!("Batch-captured {} thoughts", count);
355 Ok(count)
356 }
357
358 pub async fn search_memory(&self, req: SearchMemoryRequest) -> Result<SearchMemoryResponse> {
362 let search_thoughts = req
363 .sources
364 .as_ref()
365 .is_none_or(|s| s.iter().any(|x| x == "thoughts"));
366 let search_facts = req
367 .sources
368 .as_ref()
369 .is_none_or(|s| s.iter().any(|x| x == "facts"));
370
371 let mut results = Vec::new();
372
373 if search_thoughts {
375 let query_embedding = self.embeddings.embed_cached(&req.query)?;
376
377 let mut filters = vec![Filter::Eq(
379 "deleted".into(),
380 FieldValue::Boolean(Some(false)),
381 )];
382
383 if let Some(ref cat) = req.category {
384 let cat_str = ThoughtCategory::parse(cat).as_str().to_string();
385 filters.push(Filter::Eq(
386 "category".into(),
387 FieldValue::Utf8(Some(cat_str)),
388 ));
389 }
390
391 let filter = Filter::And(filters);
392
393 let scored_records = self
394 .backend
395 .vector_search(
396 THOUGHTS_TABLE,
397 "vector",
398 query_embedding,
399 req.limit,
400 Some(&filter),
401 )
402 .await?;
403
404 for sr in scored_records {
405 let score = sr.score;
406 if score >= req.min_score {
407 let thought = Self::record_to_thought(&sr.record)?;
408 results.push(MemorySearchResult {
409 content: thought.content,
410 score,
411 source: "thoughts".into(),
412 thought_id: Some(thought.id),
413 category: Some(thought.category.to_string()),
414 tags: Some(thought.tags),
415 created_at: Some(thought.created_at),
416 });
417 }
418 }
419 }
420
421 if search_facts {
423 let pks_results = self.pks_cache.search_facts(&req.query);
424 for fact in pks_results {
425 let score = 0.7; if score >= req.min_score {
427 results.push(MemorySearchResult {
428 content: format!("{}: {}", fact.key, fact.value),
429 score,
430 source: "facts".into(),
431 thought_id: None,
432 category: Some(format!("{:?}", fact.category)),
433 tags: None,
434 created_at: Some(fact.created_at),
435 });
436 }
437 }
438 }
439
440 if !self.config.is_noop() {
442 results.retain(|r| !self.config.blocks_content(&r.content));
443 for r in &mut results {
444 let delta = self.config.disposition_score_delta(&r.content);
445 r.score = (r.score + delta).clamp(0.0, 1.0);
446 }
447 }
448
449 results.sort_by(|a, b| {
451 b.score
452 .partial_cmp(&a.score)
453 .unwrap_or(std::cmp::Ordering::Equal)
454 });
455 results.truncate(req.limit);
456
457 let total = results.len();
458 Ok(SearchMemoryResponse { results, total })
459 }
460
461 pub async fn list_recent(&self, req: ListRecentRequest) -> Result<ListRecentResponse> {
465 let since_ts = match &req.since {
466 Some(s) => chrono::DateTime::parse_from_rfc3339(s)
467 .map(|dt| dt.timestamp())
468 .unwrap_or_else(|_| Utc::now().timestamp() - 7 * 86400),
469 None => Utc::now().timestamp() - 7 * 86400,
470 };
471
472 let mut filters = vec![
473 Filter::Eq("deleted".into(), FieldValue::Boolean(Some(false))),
474 Filter::Gte("created_at".into(), FieldValue::Int64(Some(since_ts))),
475 ];
476
477 if let Some(ref cat) = req.category {
478 let cat_str = ThoughtCategory::parse(cat).as_str().to_string();
479 filters.push(Filter::Eq(
480 "category".into(),
481 FieldValue::Utf8(Some(cat_str)),
482 ));
483 }
484
485 let filter = Filter::And(filters);
486
487 let records = self
488 .backend
489 .query(THOUGHTS_TABLE, Some(&filter), Some(req.limit))
490 .await?;
491
492 let mut thoughts = Self::records_to_thoughts(&records)?;
493 thoughts.sort_by(|a, b| b.created_at.cmp(&a.created_at));
494 thoughts.truncate(req.limit);
495
496 let total = thoughts.len();
497 let summaries = thoughts
498 .into_iter()
499 .map(|t| ThoughtSummary {
500 id: t.id,
501 content: t.content,
502 category: t.category.to_string(),
503 tags: t.tags,
504 importance: t.importance,
505 created_at: t.created_at,
506 })
507 .collect();
508
509 Ok(ListRecentResponse {
510 thoughts: summaries,
511 total,
512 })
513 }
514
515 pub async fn query_thought_contents(
517 &self,
518 filter: &Filter,
519 limit: usize,
520 ) -> Result<Vec<String>> {
521 let records = self
522 .backend
523 .query(THOUGHTS_TABLE, Some(filter), Some(limit))
524 .await?;
525 let thoughts = Self::records_to_thoughts(&records)?;
526 Ok(thoughts.into_iter().map(|t| t.content).collect())
527 }
528
529 pub async fn get_thought(&self, id: &str) -> Result<Option<GetThoughtResponse>> {
533 let filter = Filter::And(vec![
534 Filter::Eq("id".into(), FieldValue::Utf8(Some(id.to_string()))),
535 Filter::Eq("deleted".into(), FieldValue::Boolean(Some(false))),
536 ]);
537
538 let records = self
539 .backend
540 .query(THOUGHTS_TABLE, Some(&filter), Some(1))
541 .await?;
542
543 let thoughts = Self::records_to_thoughts(&records)?;
544
545 Ok(thoughts.into_iter().next().map(|t| GetThoughtResponse {
546 id: t.id,
547 content: t.content,
548 category: t.category.to_string(),
549 tags: t.tags,
550 source: t.source.to_string(),
551 importance: t.importance,
552 created_at: t.created_at,
553 updated_at: t.updated_at,
554 }))
555 }
556
557 pub fn search_knowledge(&self, req: SearchKnowledgeRequest) -> Result<SearchKnowledgeResponse> {
561 let search_pks = req
562 .source
563 .as_ref()
564 .is_none_or(|s| s == "all" || s == "personal");
565 let search_bks = req
566 .source
567 .as_ref()
568 .is_none_or(|s| s == "all" || s == "behavioral");
569
570 let mut results = Vec::new();
571
572 if search_pks {
573 let pks_results = self.pks_cache.search_facts(&req.query);
574 for fact in pks_results {
575 if fact.confidence >= req.min_confidence {
576 results.push(KnowledgeResult {
577 source: "personal".into(),
578 category: format!("{:?}", fact.category),
579 key: fact.key.clone(),
580 value: fact.value.clone(),
581 confidence: fact.confidence,
582 context: fact.context.clone(),
583 });
584 }
585 }
586 }
587
588 if search_bks {
589 let bks_results = self
590 .bks_cache
591 .get_matching_truths_with_scores(&req.query, req.min_confidence, req.limit)
592 .unwrap_or_default();
593 for (truth, score) in bks_results {
594 results.push(KnowledgeResult {
595 source: "behavioral".into(),
596 category: format!("{:?}", truth.category),
597 key: truth.context_pattern.clone(),
598 value: truth.rule.clone(),
599 confidence: score,
600 context: Some(truth.rationale.clone()),
601 });
602 }
603 }
604
605 results.sort_by(|a, b| {
606 b.confidence
607 .partial_cmp(&a.confidence)
608 .unwrap_or(std::cmp::Ordering::Equal)
609 });
610 results.truncate(req.limit);
611
612 let total = results.len();
613 Ok(SearchKnowledgeResponse { results, total })
614 }
615
616 pub async fn memory_stats(&self) -> Result<MemoryStatsResponse> {
620 let now = Utc::now().timestamp();
621 let one_day = 86_400i64;
622
623 let filter = Filter::Eq("deleted".into(), FieldValue::Boolean(Some(false)));
625 let records = self
626 .backend
627 .query(THOUGHTS_TABLE, Some(&filter), None)
628 .await?;
629 let all_thoughts = Self::records_to_thoughts(&records)?;
630
631 let total = all_thoughts.len();
632 let mut by_category: HashMap<String, usize> = HashMap::new();
633 let mut tag_counts: HashMap<String, usize> = HashMap::new();
634 let mut recent_24h = 0usize;
635 let mut recent_7d = 0usize;
636 let mut recent_30d = 0usize;
637
638 for t in &all_thoughts {
639 *by_category.entry(t.category.to_string()).or_insert(0) += 1;
640 for tag in &t.tags {
641 *tag_counts.entry(tag.clone()).or_insert(0) += 1;
642 }
643 let age = now - t.created_at;
644 if age <= one_day {
645 recent_24h += 1;
646 }
647 if age <= 7 * one_day {
648 recent_7d += 1;
649 }
650 if age <= 30 * one_day {
651 recent_30d += 1;
652 }
653 }
654
655 let mut top_tags: Vec<(String, usize)> = tag_counts.into_iter().collect();
656 top_tags.sort_by(|a, b| b.1.cmp(&a.1));
657 top_tags.truncate(10);
658
659 let pks_stats_raw = self.pks_cache.stats();
661 let pks_by_cat: HashMap<String, u32> = pks_stats_raw
662 .by_category
663 .into_iter()
664 .map(|(k, v)| (format!("{:?}", k), v))
665 .collect();
666
667 let bks_stats_raw = self.bks_cache.stats();
669 let bks_by_cat: HashMap<String, u32> = bks_stats_raw
670 .by_category
671 .into_iter()
672 .map(|(k, v)| (format!("{:?}", k), v))
673 .collect();
674
675 Ok(MemoryStatsResponse {
676 thoughts: ThoughtStats {
677 total,
678 by_category,
679 recent_24h,
680 recent_7d,
681 recent_30d,
682 top_tags,
683 },
684 pks: PksStats {
685 total_facts: pks_stats_raw.total_facts,
686 by_category: pks_by_cat,
687 avg_confidence: pks_stats_raw.avg_confidence,
688 },
689 bks: BksStats {
690 total_truths: bks_stats_raw.total_truths,
691 by_category: bks_by_cat,
692 },
693 })
694 }
695
696 pub async fn delete_thought(&self, id: &str) -> Result<DeleteThoughtResponse> {
700 let filter = Filter::And(vec![
702 Filter::Eq("id".into(), FieldValue::Utf8(Some(id.to_string()))),
703 Filter::Eq("deleted".into(), FieldValue::Boolean(Some(false))),
704 ]);
705
706 let count = self.backend.count(THOUGHTS_TABLE, Some(&filter)).await?;
707 if count == 0 {
708 return Ok(DeleteThoughtResponse {
709 deleted: false,
710 id: id.to_string(),
711 });
712 }
713
714 let delete_filter = Filter::Eq("id".into(), FieldValue::Utf8(Some(id.to_string())));
716 self.backend.delete(THOUGHTS_TABLE, &delete_filter).await?;
717
718 tracing::info!(id = id, "Deleted thought");
719 Ok(DeleteThoughtResponse {
720 deleted: true,
721 id: id.to_string(),
722 })
723 }
724
725 pub fn add_behavioral_truth(
727 &mut self,
728 truth: crate::knowledge::bks_pks::BehavioralTruth,
729 ) -> Result<()> {
730 self.bks_cache.add_truth(truth)?;
731 Ok(())
732 }
733
734 pub async fn delete_by_filter(&self, filter: &Filter) -> Result<usize> {
736 let count = self.backend.count(THOUGHTS_TABLE, Some(filter)).await?;
737 if count > 0 {
738 self.backend.delete(THOUGHTS_TABLE, filter).await?;
739 tracing::info!("Deleted {} thoughts by filter", count);
740 }
741 Ok(count)
742 }
743
744 fn thought_to_record(thought: &Thought, embedding: &[f32]) -> Record {
747 let tags_json = serde_json::to_string(&thought.tags).unwrap_or_else(|_| "[]".into());
748 let evidence_json =
749 serde_json::to_string(&thought.evidence_chain).unwrap_or_else(|_| "[]".into());
750
751 vec![
752 ("vector".into(), FieldValue::Vector(embedding.to_vec())),
753 ("id".into(), FieldValue::Utf8(Some(thought.id.clone()))),
754 (
755 "content".into(),
756 FieldValue::Utf8(Some(thought.content.clone())),
757 ),
758 (
759 "category".into(),
760 FieldValue::Utf8(Some(thought.category.as_str().to_string())),
761 ),
762 ("tags".into(), FieldValue::Utf8(Some(tags_json))),
763 (
764 "source".into(),
765 FieldValue::Utf8(Some(thought.source.as_str().to_string())),
766 ),
767 (
768 "importance".into(),
769 FieldValue::Float32(Some(thought.importance)),
770 ),
771 (
772 "created_at".into(),
773 FieldValue::Int64(Some(thought.created_at)),
774 ),
775 (
776 "updated_at".into(),
777 FieldValue::Int64(Some(thought.updated_at)),
778 ),
779 ("deleted".into(), FieldValue::Boolean(Some(thought.deleted))),
780 (
781 "confidence".into(),
782 FieldValue::Float32(Some(thought.confidence)),
783 ),
784 (
785 "evidence_chain".into(),
786 FieldValue::Utf8(Some(evidence_json)),
787 ),
788 (
789 "reinforcement_count".into(),
790 FieldValue::Int64(Some(thought.reinforcement_count as i64)),
791 ),
792 (
793 "contradiction_count".into(),
794 FieldValue::Int64(Some(thought.contradiction_count as i64)),
795 ),
796 ]
797 }
798
799 fn record_to_thought(record: &Record) -> Result<Thought> {
800 let id = record_get(record, "id")
801 .and_then(|v| v.as_str())
802 .context("Missing id field")?
803 .to_string();
804 let content = record_get(record, "content")
805 .and_then(|v| v.as_str())
806 .context("Missing content field")?
807 .to_string();
808 let category = record_get(record, "category")
809 .and_then(|v| v.as_str())
810 .map(ThoughtCategory::parse)
811 .context("Missing category field")?;
812 let tags_str = record_get(record, "tags")
813 .and_then(|v| v.as_str())
814 .unwrap_or("[]");
815 let tags: Vec<String> = serde_json::from_str(tags_str).unwrap_or_default();
816 let source = record_get(record, "source")
817 .and_then(|v| v.as_str())
818 .map(ThoughtSource::parse)
819 .context("Missing source field")?;
820 let importance = record_get(record, "importance")
821 .and_then(|v| v.as_f32())
822 .context("Missing importance field")?;
823 let created_at = record_get(record, "created_at")
824 .and_then(|v| v.as_i64())
825 .context("Missing created_at field")?;
826 let updated_at = record_get(record, "updated_at")
827 .and_then(|v| v.as_i64())
828 .context("Missing updated_at field")?;
829 let deleted = record_get(record, "deleted")
830 .and_then(|v| v.as_bool())
831 .unwrap_or(false);
832 let confidence = record_get(record, "confidence")
833 .and_then(|v| v.as_f32())
834 .unwrap_or(0.5);
835 let evidence_str = record_get(record, "evidence_chain")
836 .and_then(|v| v.as_str())
837 .unwrap_or("[]");
838 let evidence_chain: Vec<String> = serde_json::from_str(evidence_str).unwrap_or_default();
839 let reinforcement_count = record_get(record, "reinforcement_count")
840 .and_then(|v| v.as_i64())
841 .unwrap_or(0) as u32;
842 let contradiction_count = record_get(record, "contradiction_count")
843 .and_then(|v| v.as_i64())
844 .unwrap_or(0) as u32;
845
846 Ok(Thought {
847 id,
848 content,
849 category,
850 tags,
851 source,
852 importance,
853 created_at,
854 updated_at,
855 deleted,
856 confidence,
857 evidence_chain,
858 reinforcement_count,
859 contradiction_count,
860 })
861 }
862
863 fn records_to_thoughts(records: &[Record]) -> Result<Vec<Thought>> {
864 records.iter().map(Self::record_to_thought).collect()
865 }
866
867 async fn replace_thought(&self, thought: &Thought) -> Result<()> {
871 let delete_filter = Filter::Eq("id".into(), FieldValue::Utf8(Some(thought.id.clone())));
872 self.backend.delete(THOUGHTS_TABLE, &delete_filter).await?;
873 let embedding = self.embeddings.embed_cached(&thought.content)?;
874 let record = Self::thought_to_record(thought, &embedding);
875 self.backend
876 .insert(THOUGHTS_TABLE, vec![record])
877 .await
878 .context("Failed to reinsert updated thought")?;
879 Ok(())
880 }
881
882 async fn apply_evidence_check(
888 &self,
889 new_thought_id: &str,
890 content: &str,
891 ) -> Result<crate::knowledge::types::EvidenceCheckResult> {
892 use crate::knowledge::types::SearchMemoryRequest;
893
894 let similar = self
896 .search_memory(SearchMemoryRequest {
897 query: content.to_string(),
898 limit: 10,
899 min_score: CONTRADICTION_THRESHOLD,
900 category: None,
901 sources: Some(vec!["thoughts".into()]),
902 })
903 .await?;
904
905 let similar_results: Vec<_> = similar
907 .results
908 .into_iter()
909 .filter(|r| r.thought_id.as_deref() != Some(new_thought_id))
910 .collect();
911
912 let corroboration_result =
913 fact_extractor::check_corroboration(&similar_results, CORROBORATION_THRESHOLD);
914 let contradictions =
915 fact_extractor::check_contradiction(content, &similar_results, CONTRADICTION_THRESHOLD);
916
917 let contradiction_ids: Vec<String> = contradictions
919 .into_iter()
920 .filter(|id| !corroboration_result.corroborations.contains(id))
921 .collect();
922
923 let now = Utc::now().timestamp();
924
925 for corr_id in &corroboration_result.corroborations {
927 if let Some(mut t) = self.get_thought_internal(corr_id).await? {
928 let old_conf = t.confidence;
929 t.confidence = (EVIDENCE_EMA_ALPHA * (old_conf + 0.1)
930 + (1.0 - EVIDENCE_EMA_ALPHA) * old_conf)
931 .clamp(0.0, 1.0);
932 t.reinforcement_count += 1;
933 if !t.evidence_chain.contains(&new_thought_id.to_string()) {
934 t.evidence_chain.push(new_thought_id.to_string());
935 }
936 t.updated_at = now;
937 if let Err(e) = self.replace_thought(&t).await {
938 tracing::warn!(id = %corr_id, "Failed to update corroborated thought: {}", e);
939 }
940 }
941 }
942
943 for contra_id in &contradiction_ids {
945 if let Some(mut t) = self.get_thought_internal(contra_id).await? {
946 let old_conf = t.confidence;
947 t.confidence = (EVIDENCE_EMA_ALPHA * (old_conf - 0.1)
948 + (1.0 - EVIDENCE_EMA_ALPHA) * old_conf)
949 .clamp(0.0, 1.0);
950 t.contradiction_count += 1;
951 if !t.evidence_chain.contains(&new_thought_id.to_string()) {
952 t.evidence_chain.push(new_thought_id.to_string());
953 }
954 t.updated_at = now;
955 if let Err(e) = self.replace_thought(&t).await {
956 tracing::warn!(id = %contra_id, "Failed to update contradicted thought: {}", e);
957 }
958 }
959 }
960
961 Ok(crate::knowledge::types::EvidenceCheckResult {
962 corroborations: corroboration_result.corroborations,
963 contradictions: contradiction_ids,
964 })
965 }
966
967 async fn get_thought_internal(&self, id: &str) -> Result<Option<Thought>> {
969 let filter = Filter::And(vec![
970 Filter::Eq("id".into(), FieldValue::Utf8(Some(id.to_string()))),
971 Filter::Eq("deleted".into(), FieldValue::Boolean(Some(false))),
972 ]);
973 let records = self
974 .backend
975 .query(THOUGHTS_TABLE, Some(&filter), Some(1))
976 .await?;
977 let mut thoughts = Self::records_to_thoughts(&records)?;
978 Ok(thoughts.pop())
979 }
980}
981
982#[cfg(test)]
983mod tests {
984 use super::*;
985 use tempfile::TempDir;
986
987 async fn setup() -> (TempDir, BrainClient) {
988 let temp = TempDir::new().unwrap();
989 let lance_path = temp.path().join("brain.lance");
990 let pks_path = temp.path().join("pks.db");
991 let bks_path = temp.path().join("bks.db");
992
993 let client = BrainClient::with_paths(
994 lance_path.to_str().unwrap(),
995 pks_path.to_str().unwrap(),
996 bks_path.to_str().unwrap(),
997 )
998 .await
999 .unwrap();
1000
1001 (temp, client)
1002 }
1003
1004 #[tokio::test]
1005 async fn test_capture_and_get() {
1006 let (_temp, mut client) = setup().await;
1007
1008 let resp = client
1009 .capture_thought(CaptureThoughtRequest {
1010 content: "Decided to use PostgreSQL for auth service".into(),
1011 category: None,
1012 tags: Some(vec!["db".into()]),
1013 importance: Some(0.8),
1014 source: None,
1015 })
1016 .await
1017 .unwrap();
1018
1019 assert_eq!(resp.category, "decision");
1020 assert!(resp.tags.contains(&"db".to_string()));
1021
1022 let thought = client.get_thought(&resp.id).await.unwrap();
1023 assert!(thought.is_some());
1024 let t = thought.unwrap();
1025 assert_eq!(t.category, "decision");
1026 }
1027
1028 #[tokio::test]
1029 async fn test_search_memory() {
1030 let (_temp, mut client) = setup().await;
1031
1032 client
1033 .capture_thought(CaptureThoughtRequest {
1034 content: "Rust is great for systems programming".into(),
1035 category: Some("insight".into()),
1036 tags: None,
1037 importance: None,
1038 source: None,
1039 })
1040 .await
1041 .unwrap();
1042
1043 let results = client
1044 .search_memory(SearchMemoryRequest {
1045 query: "programming languages".into(),
1046 limit: 10,
1047 min_score: 0.0,
1048 category: None,
1049 sources: None,
1050 })
1051 .await
1052 .unwrap();
1053
1054 assert!(!results.results.is_empty());
1055 }
1056
1057 #[tokio::test]
1058 async fn test_delete_thought() {
1059 let (_temp, mut client) = setup().await;
1060
1061 let resp = client
1062 .capture_thought(CaptureThoughtRequest {
1063 content: "Something to delete".into(),
1064 category: None,
1065 tags: None,
1066 importance: None,
1067 source: None,
1068 })
1069 .await
1070 .unwrap();
1071
1072 let del = client.delete_thought(&resp.id).await.unwrap();
1073 assert!(del.deleted);
1074
1075 let thought = client.get_thought(&resp.id).await.unwrap();
1076 assert!(thought.is_none());
1077 }
1078
1079 #[tokio::test]
1080 async fn test_memory_stats() {
1081 let (_temp, client) = setup().await;
1082 let stats = client.memory_stats().await.unwrap();
1083 assert_eq!(stats.thoughts.total, 0);
1084 }
1085}