1use std::sync::Arc;
6
7use kyma_core::catalog::{Catalog, GraphSpec, TableConfig, TableRef};
8use kyma_core::segment_format::SegmentFormat;
9use kyma_core::types::DatabaseId;
10use kyma_embed::EmbeddingBackend;
11use kyma_ingest_core::WritePath;
12use serde_json::Value;
13use uuid::Uuid;
14
15use crate::error::{MemoryError, Result};
16use crate::types::CreateMemory;
17use crate::{rows, schema, EDGE_TABLE, GRAPH_NAME, NODE_TABLE};
18
19pub struct MemoryWriter {
21 catalog: Arc<dyn Catalog>,
22 write: WritePath,
23 embed: Arc<dyn EmbeddingBackend>,
24 database: String,
25}
26
27impl MemoryWriter {
28 pub fn new(
32 catalog: Arc<dyn Catalog>,
33 format: Arc<dyn SegmentFormat>,
34 embed: Arc<dyn EmbeddingBackend>,
35 ) -> Self {
36 let write = WritePath::new(catalog.clone(), format);
37 Self {
38 catalog,
39 write,
40 embed,
41 database: crate::DEFAULT_DATABASE.to_string(),
42 }
43 }
44
45 pub fn database(&self) -> &str {
46 &self.database
47 }
48
49 pub async fn ensure_provisioned(&self) -> Result<()> {
54 if let Ok(tref) = self.catalog.lookup_table(&self.database, NODE_TABLE).await {
55 self.ensure_bitemporal_columns(&tref).await?;
56 return Ok(());
57 }
58 let db_id = self.ensure_database().await?;
59 let dim = self.embed.dimension() as i32;
60 let _ = self
62 .catalog
63 .create_table(db_id, NODE_TABLE, schema::memory_nodes_schema(dim), TableConfig::default())
64 .await;
65 let _ = self
66 .catalog
67 .create_table(db_id, EDGE_TABLE, schema::memory_edges_schema(), TableConfig::default())
68 .await;
69
70 let spec = GraphSpec {
71 node_table: NODE_TABLE.into(),
72 edge_table: EDGE_TABLE.into(),
73 id_col: "id".into(),
74 label_col: "labels".into(),
75 src_col: "src".into(),
76 dst_col: "dst".into(),
77 type_col: "type".into(),
78 realm_col: Some("realm".into()),
79 };
80 if let Err(e) = self.catalog.create_graph(&self.database, GRAPH_NAME, spec).await {
81 let msg = e.to_string();
82 if !(msg.contains("exists") || msg.contains("duplicate")) {
83 return Err(MemoryError::Catalog(msg));
84 }
85 }
86 Ok(())
87 }
88
89 async fn ensure_bitemporal_columns(&self, tref: &TableRef) -> Result<()> {
94 for col in schema::BITEMPORAL_COLUMNS {
95 if tref.schema.field_with_name(col).is_ok() {
96 continue;
97 }
98 if let Err(e) = self.catalog.alter_table_add_column(tref.id, col, "string").await {
99 let msg = e.to_string();
100 if !(msg.contains("exists") || msg.contains("duplicate")) {
102 return Err(MemoryError::Catalog(msg));
103 }
104 }
105 }
106 Ok(())
107 }
108
109 async fn ensure_database(&self) -> Result<DatabaseId> {
110 if let Some(id) = self
114 .catalog
115 .lookup_database(&self.database)
116 .await
117 .map_err(|e| MemoryError::Catalog(e.to_string()))?
118 {
119 return Ok(id);
120 }
121 self.catalog
122 .create_database(&self.database)
123 .await
124 .map_err(|e| MemoryError::Catalog(e.to_string()))
125 }
126
127 pub async fn save(&self, m: &CreateMemory) -> Result<Uuid> {
130 self.ensure_provisioned().await?;
131 let red = redact_create(m);
134 let m = &red;
135 let emb = self.embed_one(&m.content).await?;
136 let id = Uuid::new_v4();
137 let now = now_rfc3339();
138 let node = rows::node_row(&id, m, &emb, &now);
139 self.append_rows(NODE_TABLE, vec![node]).await?;
140
141 if !m.references.is_empty() {
142 let src = rows::node_id(&id);
143 let edges: Vec<Value> = m
144 .references
145 .iter()
146 .map(|r| rows::edge_row(&src, r, "REFERENCES", &m.realm, None, None, &now))
147 .collect();
148 self.append_rows(EDGE_TABLE, edges).await?;
149 }
150 Ok(id)
151 }
152
153 pub async fn save_as(&self, id: Uuid, m: &CreateMemory) -> Result<()> {
157 self.ensure_provisioned().await?;
158 let red = redact_create(m);
159 let m = &red;
160 let emb = self.embed_one(&m.content).await?;
161 let now = now_rfc3339();
162 let node = rows::node_row(&id, m, &emb, &now);
163 self.append_rows(NODE_TABLE, vec![node]).await
164 }
165
166 pub async fn link(
169 &self,
170 src_node_id: &str,
171 dst_node_id: &str,
172 rel_type: &str,
173 realm: &str,
174 target_namespace: Option<&str>,
175 ) -> Result<()> {
176 self.ensure_provisioned().await?;
177 let now = now_rfc3339();
178 let edge = rows::edge_row(src_node_id, dst_node_id, rel_type, realm, target_namespace, None, &now);
179 self.append_rows(EDGE_TABLE, vec![edge]).await
180 }
181
182 pub async fn append_node_rows(&self, node_rows: Vec<Value>) -> Result<()> {
184 self.ensure_provisioned().await?;
185 self.append_rows(NODE_TABLE, node_rows).await
186 }
187
188 pub async fn append_edge_rows(&self, edge_rows: Vec<Value>) -> Result<()> {
190 self.ensure_provisioned().await?;
191 self.append_rows(EDGE_TABLE, edge_rows).await
192 }
193
194 pub async fn embed_one(&self, text: &str) -> Result<Vec<f32>> {
196 let out = self
197 .embed
198 .embed(&[text.to_string()])
199 .await
200 .map_err(|e| MemoryError::Embed(e.to_string()))?;
201 out.into_iter()
202 .next()
203 .ok_or_else(|| MemoryError::Embed("backend returned no vector".into()))
204 }
205
206 async fn append_rows(&self, table: &str, json_rows: Vec<Value>) -> Result<()> {
207 if json_rows.is_empty() {
208 return Ok(());
209 }
210 let tref = self
211 .catalog
212 .lookup_table(&self.database, table)
213 .await
214 .map_err(|e| MemoryError::Catalog(e.to_string()))?;
215 let mut buf = Vec::with_capacity(json_rows.len() * 128);
216 for r in &json_rows {
217 serde_json::to_writer(&mut buf, r).map_err(|e| MemoryError::Ingest(e.to_string()))?;
218 buf.push(b'\n');
219 }
220 let batches = kyma_ingest_core::parse_ndjson(&buf, tref.schema.clone())
221 .map_err(|e| MemoryError::Ingest(e.to_string()))?;
222 self.write
223 .ingest(&tref, batches)
224 .await
225 .map_err(|e| MemoryError::Write(e.to_string()))?;
226 Ok(())
227 }
228}
229
230fn now_rfc3339() -> String {
231 chrono::Utc::now().to_rfc3339()
232}
233
234fn redact_create(m: &CreateMemory) -> CreateMemory {
237 let mut out = m.clone();
238 out.content = redact_private(&m.content);
239 out.title = m.title.as_deref().map(redact_private);
240 out
241}
242
243fn redact_private(s: &str) -> String {
247 const OPEN: &str = "<private>";
248 const CLOSE: &str = "</private>";
249 let lower = s.to_ascii_lowercase();
250 let mut out = String::with_capacity(s.len());
251 let mut i = 0usize;
252 while i < s.len() {
253 match lower[i..].find(OPEN) {
254 Some(rel) => {
255 let start = i + rel;
256 out.push_str(&s[i..start]);
257 out.push_str("[redacted]");
258 let after = start + OPEN.len();
259 match lower[after..].find(CLOSE) {
260 Some(crel) => i = after + crel + CLOSE.len(),
261 None => break, }
263 }
264 None => {
265 out.push_str(&s[i..]);
266 break;
267 }
268 }
269 }
270 out
271}
272
273#[cfg(test)]
274mod redact_tests {
275 use super::redact_private;
276
277 #[test]
278 fn strips_private_spans() {
279 assert_eq!(
280 redact_private("token is <private>sk-abc123</private> ok"),
281 "token is [redacted] ok"
282 );
283 }
284
285 #[test]
286 fn unclosed_private_drops_tail() {
287 assert_eq!(redact_private("safe <PRIVATE>secret tail"), "safe [redacted]");
288 }
289
290 #[test]
291 fn no_tags_unchanged() {
292 assert_eq!(redact_private("nothing to redact"), "nothing to redact");
293 }
294}