Skip to main content

harness_cortexdb/
lib.rs

1//! # harness-cortexdb — CortexDB as a harness `Memory`
2//!
3//! Implements [`harness_core::Memory`] over [CortexDB](https://github.com/liliang-cn/cortexdb)'s
4//! MCP server: `recall` calls `memory_search` (semantic / lexical vector
5//! retrieval) and `write` calls `memory_save`. Drop it in anywhere a
6//! `Memory` is expected — the `MemoryGuide`, the experience layer
7//! (`harness-experience`), a scheduler, etc. — to get **semantic recall** and a
8//! brain that can be *shared* with other tools (Claude Code / Codex all use
9//! `~/.cortexdb` by default).
10//!
11//! ```ignore
12//! use harness_cortexdb::CortexdbMemory;
13//! use std::sync::Arc;
14//!
15//! // Spawn CortexDB's MCP server; share the global brain (~/.cortexdb).
16//! let mem = Arc::new(CortexdbMemory::connect_stdio("cortexdb-mcp-stdio", &[]).await?);
17//! let recorder = harness_experience::ExperienceRecorder::new(mem); // semantic!
18//! ```
19//!
20//! Harness `MemoryEntry` fields round-trip through CortexDB: `content` maps to
21//! the memory content; `tags` + `source` are stored under CortexDB's
22//! `metadata` and read back on recall.
23
24use async_trait::async_trait;
25use harness_core::{Memory, MemoryEntry, MemoryError, Tool};
26use harness_mcp_client::McpClient;
27use serde_json::{Value, json};
28use std::sync::Arc;
29use std::sync::atomic::{AtomicU64, Ordering};
30
31/// A [`Memory`] backed by CortexDB's MCP `memory_save` / `memory_search` tools.
32pub struct CortexdbMemory {
33    // Owns the MCP session so the CortexDB child process stays alive.
34    _client: McpClient,
35    save: Arc<dyn Tool>,
36    search: Arc<dyn Tool>,
37    scope: String,
38    namespace: String,
39    user_id: Option<String>,
40    seq: AtomicU64,
41}
42
43impl CortexdbMemory {
44    /// Spawn `program args...` as CortexDB's MCP stdio server and adapt it.
45    /// (e.g. `connect_stdio("cortexdb-mcp-stdio", &[])`.)
46    pub async fn connect_stdio(program: &str, args: &[&str]) -> anyhow::Result<Self> {
47        Self::from_client(McpClient::connect_stdio(program, args).await?)
48    }
49
50    /// Adapt an already-connected CortexDB MCP client.
51    pub fn from_client(client: McpClient) -> anyhow::Result<Self> {
52        let tools = client.tools();
53        let find = |name: &str| tools.iter().find(|t| t.name() == name).cloned();
54        let save = find("memory_save")
55            .ok_or_else(|| anyhow::anyhow!("CortexDB MCP server exposes no `memory_save` tool"))?;
56        let search = find("memory_search").ok_or_else(|| {
57            anyhow::anyhow!("CortexDB MCP server exposes no `memory_search` tool")
58        })?;
59        Ok(Self {
60            _client: client,
61            save,
62            search,
63            scope: "global".into(),
64            namespace: "harness".into(),
65            user_id: None,
66            seq: AtomicU64::new(0),
67        })
68    }
69
70    /// CortexDB memory scope: `global` (default, shared), `user`, or `session`.
71    pub fn with_scope(mut self, scope: impl Into<String>) -> Self {
72        self.scope = scope.into();
73        self
74    }
75    /// CortexDB namespace (default `harness`). Isolates this app's memories.
76    pub fn with_namespace(mut self, ns: impl Into<String>) -> Self {
77        self.namespace = ns.into();
78        self
79    }
80    /// Scope memories to a user id (for `scope = user`).
81    pub fn with_user_id(mut self, user: impl Into<String>) -> Self {
82        self.user_id = Some(user.into());
83        self
84    }
85
86    fn next_id(&self) -> String {
87        let n = self.seq.fetch_add(1, Ordering::SeqCst);
88        let t = std::time::SystemTime::now()
89            .duration_since(std::time::UNIX_EPOCH)
90            .map(|d| d.as_nanos())
91            .unwrap_or(0);
92        format!("harness-{t}-{n}")
93    }
94}
95
96#[async_trait]
97impl Memory for CortexdbMemory {
98    async fn write(&self, entry: MemoryEntry) -> Result<(), MemoryError> {
99        let id = if entry.id.is_empty() {
100            self.next_id()
101        } else {
102            entry.id.clone()
103        };
104        let mut metadata = serde_json::Map::new();
105        if !entry.tags.is_empty() {
106            metadata.insert("tags".into(), json!(entry.tags));
107        }
108        if let Some(s) = &entry.source {
109            metadata.insert("source".into(), json!(s));
110        }
111        let mut args = json!({
112            "memory_id": id,
113            "content": entry.content,
114            "scope": self.scope,
115            "namespace": self.namespace,
116            "metadata": Value::Object(metadata),
117        });
118        if let Some(u) = &self.user_id {
119            args["user_id"] = json!(u);
120        }
121
122        let mut world = harness_context::default_world(".");
123        let res = self
124            .save
125            .invoke(args, &mut world)
126            .await
127            .map_err(|e| MemoryError::Backend(e.to_string()))?;
128        if !res.ok {
129            return Err(MemoryError::Backend(format!(
130                "memory_save: {}",
131                res.content
132            )));
133        }
134        Ok(())
135    }
136
137    async fn recall(&self, query: &str, k: usize) -> Result<Vec<MemoryEntry>, MemoryError> {
138        if k == 0 || query.trim().is_empty() {
139            return Ok(Vec::new());
140        }
141        let mut args = json!({
142            "query": query,
143            "top_k": k,
144            "scope": self.scope,
145            "namespace": self.namespace,
146        });
147        if let Some(u) = &self.user_id {
148            args["user_id"] = json!(u);
149        }
150
151        let mut world = harness_context::default_world(".");
152        let res = self
153            .search
154            .invoke(args, &mut world)
155            .await
156            .map_err(|e| MemoryError::Backend(e.to_string()))?;
157        if !res.ok {
158            return Err(MemoryError::Backend(format!(
159                "memory_search: {}",
160                res.content
161            )));
162        }
163        // MemorySearchResponse { results: [ { memory: MemoryRecord, score } ] }
164        let results = res
165            .content
166            .get("results")
167            .and_then(|v| v.as_array())
168            .cloned()
169            .unwrap_or_default();
170
171        let mut out = Vec::with_capacity(results.len());
172        for hit in &results {
173            // The record is under `memory`; tolerate a flattened shape too.
174            let rec = hit.get("memory").unwrap_or(hit);
175            let content = rec
176                .get("content")
177                .and_then(|v| v.as_str())
178                .unwrap_or_default()
179                .to_string();
180            if content.trim().is_empty() {
181                continue;
182            }
183            let mut entry = MemoryEntry::new(content);
184            if let Some(id) = rec.get("id").and_then(|v| v.as_str()) {
185                entry.id = id.to_string();
186            }
187            if let Some(meta) = rec.get("metadata") {
188                if let Some(tags) = meta.get("tags").and_then(|v| v.as_array()) {
189                    entry.tags = tags
190                        .iter()
191                        .filter_map(|t| t.as_str().map(String::from))
192                        .collect();
193                }
194                if let Some(src) = meta.get("source").and_then(|v| v.as_str()) {
195                    entry.source = Some(src.to_string());
196                }
197            }
198            out.push(entry);
199        }
200        Ok(out)
201    }
202}