Skip to main content

kyma_memory/
writer.rs

1//! Writes memory nodes/edges to Kyma columnar storage and registers the
2//! `memory` graph. Append-only: mutations (status/importance/merge) write a new
3//! version row; recall dedups to the latest by `updated_at`.
4
5use std::sync::Arc;
6
7use kyma_core::catalog::{Catalog, GraphSpec, TableConfig, TableRef};
8use kyma_core::segment_format::SegmentFormat;
9use kyma_core::types::DatabaseId;
10use kyma_embed::EmbeddingBackend;
11use kyma_ingest_core::WritePath;
12use serde_json::Value;
13use uuid::Uuid;
14
15use crate::error::{MemoryError, Result};
16use crate::types::CreateMemory;
17use crate::{rows, schema, EDGE_TABLE, GRAPH_NAME, NODE_TABLE};
18
19/// Writes memory data and provisions the memory database/tables/graph on demand.
20pub struct MemoryWriter {
21    catalog: Arc<dyn Catalog>,
22    write: WritePath,
23    embed: Arc<dyn EmbeddingBackend>,
24    database: String,
25}
26
27impl MemoryWriter {
28    /// Build a writer over the catalog + format engine. Backend-agnostic — works
29    /// over the Postgres catalog (server) or the embedded SQLite catalog
30    /// (`kyma local`); no direct DB pool is required.
31    pub fn new(
32        catalog: Arc<dyn Catalog>,
33        format: Arc<dyn SegmentFormat>,
34        embed: Arc<dyn EmbeddingBackend>,
35    ) -> Self {
36        let write = WritePath::new(catalog.clone(), format);
37        Self {
38            catalog,
39            write,
40            embed,
41            database: crate::DEFAULT_DATABASE.to_string(),
42        }
43    }
44
45    pub fn database(&self) -> &str {
46        &self.database
47    }
48
49    /// Ensure the memory database, both tables, and the `memory` graph
50    /// registration exist. Fast-paths when the node table is already present,
51    /// but first backfills any bi-temporal columns a pre-existing store is
52    /// missing (non-destructive: historical extents null-fill on read).
53    pub async fn ensure_provisioned(&self) -> Result<()> {
54        if let Ok(tref) = self.catalog.lookup_table(&self.database, NODE_TABLE).await {
55            self.ensure_bitemporal_columns(&tref).await?;
56            return Ok(());
57        }
58        let db_id = self.ensure_database().await?;
59        let dim = self.embed.dimension() as i32;
60        // Races (two concurrent first-writes) surface as "already exists" — ignore.
61        let _ = self
62            .catalog
63            .create_table(db_id, NODE_TABLE, schema::memory_nodes_schema(dim), TableConfig::default())
64            .await;
65        let _ = self
66            .catalog
67            .create_table(db_id, EDGE_TABLE, schema::memory_edges_schema(), TableConfig::default())
68            .await;
69
70        let spec = GraphSpec {
71            node_table: NODE_TABLE.into(),
72            edge_table: EDGE_TABLE.into(),
73            id_col: "id".into(),
74            label_col: "labels".into(),
75            src_col: "src".into(),
76            dst_col: "dst".into(),
77            type_col: "type".into(),
78            realm_col: Some("realm".into()),
79        };
80        if let Err(e) = self.catalog.create_graph(&self.database, GRAPH_NAME, spec).await {
81            let msg = e.to_string();
82            if !(msg.contains("exists") || msg.contains("duplicate")) {
83                return Err(MemoryError::Catalog(msg));
84            }
85        }
86        Ok(())
87    }
88
89    /// Add any bi-temporal columns missing from an already-provisioned
90    /// `memory_nodes` table. Stores created before bi-temporal support keep
91    /// their extents; the new nullable columns read as NULL ("always valid"),
92    /// which the recall validity guard treats correctly. Idempotent.
93    async fn ensure_bitemporal_columns(&self, tref: &TableRef) -> Result<()> {
94        for col in schema::BITEMPORAL_COLUMNS {
95            if tref.schema.field_with_name(col).is_ok() {
96                continue;
97            }
98            if let Err(e) = self.catalog.alter_table_add_column(tref.id, col, "string").await {
99                let msg = e.to_string();
100                // A concurrent writer may have added it first.
101                if !(msg.contains("exists") || msg.contains("duplicate")) {
102                    return Err(MemoryError::Catalog(msg));
103                }
104            }
105        }
106        Ok(())
107    }
108
109    async fn ensure_database(&self) -> Result<DatabaseId> {
110        // Resolve via the catalog trait (backend-agnostic), then create if
111        // missing. Races (two concurrent first-writes) surface as a unique
112        // violation on create — callers treat "exists" as benign.
113        if let Some(id) = self
114            .catalog
115            .lookup_database(&self.database)
116            .await
117            .map_err(|e| MemoryError::Catalog(e.to_string()))?
118        {
119            return Ok(id);
120        }
121        self.catalog
122            .create_database(&self.database)
123            .await
124            .map_err(|e| MemoryError::Catalog(e.to_string()))
125    }
126
127    /// Embed + persist a new memory node plus a `REFERENCES` edge per reference.
128    /// Returns the new memory uuid.
129    pub async fn save(&self, m: &CreateMemory) -> Result<Uuid> {
130        self.ensure_provisioned().await?;
131        // Strip `<private>…</private>` spans at the store layer (defense in
132        // depth beyond the capture hooks) so secrets are never persisted/embedded.
133        let red = redact_create(m);
134        let m = &red;
135        let emb = self.embed_one(&m.content).await?;
136        let id = Uuid::new_v4();
137        let now = now_rfc3339();
138        let node = rows::node_row(&id, m, &emb, &now);
139        self.append_rows(NODE_TABLE, vec![node]).await?;
140
141        if !m.references.is_empty() {
142            let src = rows::node_id(&id);
143            let edges: Vec<Value> = m
144                .references
145                .iter()
146                .map(|r| rows::edge_row(&src, r, "REFERENCES", &m.realm, None, None, &now))
147                .collect();
148            self.append_rows(EDGE_TABLE, edges).await?;
149        }
150        Ok(id)
151    }
152
153    /// Upsert a new version of an existing memory `id` (same id → latest-wins),
154    /// re-embedding the redacted content. Used by topic-key upsert so a repeated
155    /// save updates in place instead of creating a duplicate.
156    pub async fn save_as(&self, id: Uuid, m: &CreateMemory) -> Result<()> {
157        self.ensure_provisioned().await?;
158        let red = redact_create(m);
159        let m = &red;
160        let emb = self.embed_one(&m.content).await?;
161        let now = now_rfc3339();
162        let node = rows::node_row(&id, m, &emb, &now);
163        self.append_rows(NODE_TABLE, vec![node]).await
164    }
165
166    /// Append a single edge linking a memory node to another (possibly
167    /// cross-graph) entity.
168    pub async fn link(
169        &self,
170        src_node_id: &str,
171        dst_node_id: &str,
172        rel_type: &str,
173        realm: &str,
174        target_namespace: Option<&str>,
175    ) -> Result<()> {
176        self.ensure_provisioned().await?;
177        let now = now_rfc3339();
178        let edge = rows::edge_row(src_node_id, dst_node_id, rel_type, realm, target_namespace, None, &now);
179        self.append_rows(EDGE_TABLE, vec![edge]).await
180    }
181
182    /// Append pre-built node rows (used by read-then-append mutations).
183    pub async fn append_node_rows(&self, node_rows: Vec<Value>) -> Result<()> {
184        self.ensure_provisioned().await?;
185        self.append_rows(NODE_TABLE, node_rows).await
186    }
187
188    /// Append pre-built edge rows.
189    pub async fn append_edge_rows(&self, edge_rows: Vec<Value>) -> Result<()> {
190        self.ensure_provisioned().await?;
191        self.append_rows(EDGE_TABLE, edge_rows).await
192    }
193
194    /// Embed a single text, returning its vector.
195    pub async fn embed_one(&self, text: &str) -> Result<Vec<f32>> {
196        let out = self
197            .embed
198            .embed(&[text.to_string()])
199            .await
200            .map_err(|e| MemoryError::Embed(e.to_string()))?;
201        out.into_iter()
202            .next()
203            .ok_or_else(|| MemoryError::Embed("backend returned no vector".into()))
204    }
205
206    async fn append_rows(&self, table: &str, json_rows: Vec<Value>) -> Result<()> {
207        if json_rows.is_empty() {
208            return Ok(());
209        }
210        let tref = self
211            .catalog
212            .lookup_table(&self.database, table)
213            .await
214            .map_err(|e| MemoryError::Catalog(e.to_string()))?;
215        let mut buf = Vec::with_capacity(json_rows.len() * 128);
216        for r in &json_rows {
217            serde_json::to_writer(&mut buf, r).map_err(|e| MemoryError::Ingest(e.to_string()))?;
218            buf.push(b'\n');
219        }
220        let batches = kyma_ingest_core::parse_ndjson(&buf, tref.schema.clone())
221            .map_err(|e| MemoryError::Ingest(e.to_string()))?;
222        self.write
223            .ingest(&tref, batches)
224            .await
225            .map_err(|e| MemoryError::Write(e.to_string()))?;
226        Ok(())
227    }
228}
229
230fn now_rfc3339() -> String {
231    chrono::Utc::now().to_rfc3339()
232}
233
234/// Return a copy of `m` with `<private>…</private>` spans redacted from its
235/// content and title. Cheap clone on the (low-volume) save path.
236fn redact_create(m: &CreateMemory) -> CreateMemory {
237    let mut out = m.clone();
238    out.content = redact_private(&m.content);
239    out.title = m.title.as_deref().map(redact_private);
240    out
241}
242
243/// Replace every `<private>…</private>` span (case-insensitive tags) with
244/// `[redacted]`. An unclosed `<private>` redacts to end-of-string. ASCII tag
245/// offsets are char boundaries, so the byte slicing stays UTF-8-safe.
246fn redact_private(s: &str) -> String {
247    const OPEN: &str = "<private>";
248    const CLOSE: &str = "</private>";
249    let lower = s.to_ascii_lowercase();
250    let mut out = String::with_capacity(s.len());
251    let mut i = 0usize;
252    while i < s.len() {
253        match lower[i..].find(OPEN) {
254            Some(rel) => {
255                let start = i + rel;
256                out.push_str(&s[i..start]);
257                out.push_str("[redacted]");
258                let after = start + OPEN.len();
259                match lower[after..].find(CLOSE) {
260                    Some(crel) => i = after + crel + CLOSE.len(),
261                    None => break, // unclosed → drop the rest
262                }
263            }
264            None => {
265                out.push_str(&s[i..]);
266                break;
267            }
268        }
269    }
270    out
271}
272
273#[cfg(test)]
274mod redact_tests {
275    use super::redact_private;
276
277    #[test]
278    fn strips_private_spans() {
279        assert_eq!(
280            redact_private("token is <private>sk-abc123</private> ok"),
281            "token is [redacted] ok"
282        );
283    }
284
285    #[test]
286    fn unclosed_private_drops_tail() {
287        assert_eq!(redact_private("safe <PRIVATE>secret tail"), "safe [redacted]");
288    }
289
290    #[test]
291    fn no_tags_unchanged() {
292        assert_eq!(redact_private("nothing to redact"), "nothing to redact");
293    }
294}