use std::sync::Arc;
use kyma_core::catalog::{Catalog, GraphSpec, TableConfig, TableRef};
use kyma_core::segment_format::SegmentFormat;
use kyma_core::types::DatabaseId;
use kyma_embed::EmbeddingBackend;
use kyma_ingest_core::WritePath;
use serde_json::Value;
use uuid::Uuid;
use crate::error::{MemoryError, Result};
use crate::types::CreateMemory;
use crate::{rows, schema, EDGE_TABLE, GRAPH_NAME, NODE_TABLE};
pub struct MemoryWriter {
catalog: Arc<dyn Catalog>,
write: WritePath,
embed: Arc<dyn EmbeddingBackend>,
database: String,
}
impl MemoryWriter {
pub fn new(
catalog: Arc<dyn Catalog>,
format: Arc<dyn SegmentFormat>,
embed: Arc<dyn EmbeddingBackend>,
) -> Self {
let write = WritePath::new(catalog.clone(), format);
Self {
catalog,
write,
embed,
database: crate::DEFAULT_DATABASE.to_string(),
}
}
pub fn database(&self) -> &str {
&self.database
}
pub async fn ensure_provisioned(&self) -> Result<()> {
if let Ok(tref) = self.catalog.lookup_table(&self.database, NODE_TABLE).await {
self.ensure_bitemporal_columns(&tref).await?;
return Ok(());
}
let db_id = self.ensure_database().await?;
let dim = self.embed.dimension() as i32;
let _ = self
.catalog
.create_table(db_id, NODE_TABLE, schema::memory_nodes_schema(dim), TableConfig::default())
.await;
let _ = self
.catalog
.create_table(db_id, EDGE_TABLE, schema::memory_edges_schema(), TableConfig::default())
.await;
let spec = GraphSpec {
node_table: NODE_TABLE.into(),
edge_table: EDGE_TABLE.into(),
id_col: "id".into(),
label_col: "labels".into(),
src_col: "src".into(),
dst_col: "dst".into(),
type_col: "type".into(),
realm_col: Some("realm".into()),
};
if let Err(e) = self.catalog.create_graph(&self.database, GRAPH_NAME, spec).await {
let msg = e.to_string();
if !(msg.contains("exists") || msg.contains("duplicate")) {
return Err(MemoryError::Catalog(msg));
}
}
Ok(())
}
async fn ensure_bitemporal_columns(&self, tref: &TableRef) -> Result<()> {
for col in schema::BITEMPORAL_COLUMNS {
if tref.schema.field_with_name(col).is_ok() {
continue;
}
if let Err(e) = self.catalog.alter_table_add_column(tref.id, col, "string").await {
let msg = e.to_string();
if !(msg.contains("exists") || msg.contains("duplicate")) {
return Err(MemoryError::Catalog(msg));
}
}
}
Ok(())
}
async fn ensure_database(&self) -> Result<DatabaseId> {
if let Some(id) = self
.catalog
.lookup_database(&self.database)
.await
.map_err(|e| MemoryError::Catalog(e.to_string()))?
{
return Ok(id);
}
self.catalog
.create_database(&self.database)
.await
.map_err(|e| MemoryError::Catalog(e.to_string()))
}
pub async fn save(&self, m: &CreateMemory) -> Result<Uuid> {
self.ensure_provisioned().await?;
let red = redact_create(m);
let m = &red;
let emb = self.embed_one(&m.content).await?;
let id = Uuid::new_v4();
let now = now_rfc3339();
let node = rows::node_row(&id, m, &emb, &now);
self.append_rows(NODE_TABLE, vec![node]).await?;
if !m.references.is_empty() {
let src = rows::node_id(&id);
let edges: Vec<Value> = m
.references
.iter()
.map(|r| rows::edge_row(&src, r, "REFERENCES", &m.realm, None, None, &now))
.collect();
self.append_rows(EDGE_TABLE, edges).await?;
}
Ok(id)
}
pub async fn save_as(&self, id: Uuid, m: &CreateMemory) -> Result<()> {
self.ensure_provisioned().await?;
let red = redact_create(m);
let m = &red;
let emb = self.embed_one(&m.content).await?;
let now = now_rfc3339();
let node = rows::node_row(&id, m, &emb, &now);
self.append_rows(NODE_TABLE, vec![node]).await
}
pub async fn link(
&self,
src_node_id: &str,
dst_node_id: &str,
rel_type: &str,
realm: &str,
target_namespace: Option<&str>,
) -> Result<()> {
self.ensure_provisioned().await?;
let now = now_rfc3339();
let edge = rows::edge_row(src_node_id, dst_node_id, rel_type, realm, target_namespace, None, &now);
self.append_rows(EDGE_TABLE, vec![edge]).await
}
pub async fn append_node_rows(&self, node_rows: Vec<Value>) -> Result<()> {
self.ensure_provisioned().await?;
self.append_rows(NODE_TABLE, node_rows).await
}
pub async fn append_edge_rows(&self, edge_rows: Vec<Value>) -> Result<()> {
self.ensure_provisioned().await?;
self.append_rows(EDGE_TABLE, edge_rows).await
}
pub async fn embed_one(&self, text: &str) -> Result<Vec<f32>> {
let out = self
.embed
.embed(&[text.to_string()])
.await
.map_err(|e| MemoryError::Embed(e.to_string()))?;
out.into_iter()
.next()
.ok_or_else(|| MemoryError::Embed("backend returned no vector".into()))
}
async fn append_rows(&self, table: &str, json_rows: Vec<Value>) -> Result<()> {
if json_rows.is_empty() {
return Ok(());
}
let tref = self
.catalog
.lookup_table(&self.database, table)
.await
.map_err(|e| MemoryError::Catalog(e.to_string()))?;
let mut buf = Vec::with_capacity(json_rows.len() * 128);
for r in &json_rows {
serde_json::to_writer(&mut buf, r).map_err(|e| MemoryError::Ingest(e.to_string()))?;
buf.push(b'\n');
}
let batches = kyma_ingest_core::parse_ndjson(&buf, tref.schema.clone())
.map_err(|e| MemoryError::Ingest(e.to_string()))?;
self.write
.ingest(&tref, batches)
.await
.map_err(|e| MemoryError::Write(e.to_string()))?;
Ok(())
}
}
fn now_rfc3339() -> String {
chrono::Utc::now().to_rfc3339()
}
fn redact_create(m: &CreateMemory) -> CreateMemory {
let mut out = m.clone();
out.content = redact_private(&m.content);
out.title = m.title.as_deref().map(redact_private);
out
}
fn redact_private(s: &str) -> String {
const OPEN: &str = "<private>";
const CLOSE: &str = "</private>";
let lower = s.to_ascii_lowercase();
let mut out = String::with_capacity(s.len());
let mut i = 0usize;
while i < s.len() {
match lower[i..].find(OPEN) {
Some(rel) => {
let start = i + rel;
out.push_str(&s[i..start]);
out.push_str("[redacted]");
let after = start + OPEN.len();
match lower[after..].find(CLOSE) {
Some(crel) => i = after + crel + CLOSE.len(),
None => break, }
}
None => {
out.push_str(&s[i..]);
break;
}
}
}
out
}
#[cfg(test)]
mod redact_tests {
use super::redact_private;
#[test]
fn strips_private_spans() {
assert_eq!(
redact_private("token is <private>sk-abc123</private> ok"),
"token is [redacted] ok"
);
}
#[test]
fn unclosed_private_drops_tail() {
assert_eq!(redact_private("safe <PRIVATE>secret tail"), "safe [redacted]");
}
#[test]
fn no_tags_unchanged() {
assert_eq!(redact_private("nothing to redact"), "nothing to redact");
}
}