use anyhow::{anyhow, Context, Result};
use serde_json::Value;
use crate::Engine;
use kyma_memory::MemoryWriter;
use kyma_server::agent::{execute_sql, SharedToolCtx};
const NODE_COLS: &str = "id, labels, realm, memory_type, title, content, content_preview, tags, \
importance, status, source_session_id, source_run_id, embedding, created_at, updated_at, \
valid_at, invalid_at, superseded_by, provenance, topic_key";
const EDGE_COLS: &str = "id, src, dst, type, realm, target_namespace, props, created_at";
const PUSH_WM_KEY: &str = "memory_push_watermark";
const PULL_WM_KEY: &str = "memory_pull_watermark";
const EPOCH: &str = "1970-01-01T00:00:00Z";
pub struct SyncConfig {
pub cloud_url: String,
pub token: Option<String>,
pub realm: Option<String>,
pub now: String,
}
pub async fn run(engine: &Engine, cfg: SyncConfig) -> Result<()> {
let base = cfg.cloud_url.trim_end_matches('/').to_string();
let client = reqwest::Client::new();
let shared = SharedToolCtx {
catalog: engine.catalog.clone(),
format: engine.format.clone(),
pool: None,
};
let realm_sql = cfg
.realm
.as_deref()
.map(|r| format!(" AND realm = '{}'", r.replace('\'', "''")))
.unwrap_or_default();
let push_wm = engine
.sqlite
.get_sync_state(PUSH_WM_KEY)
.await?
.unwrap_or_else(|| EPOCH.to_string());
let push_esc = push_wm.replace('\'', "''");
let nodes_sql = format!(
"WITH latest AS (SELECT *, row_number() OVER (PARTITION BY id ORDER BY updated_at DESC) AS __rn FROM memory_nodes) \
SELECT {NODE_COLS} FROM latest WHERE __rn = 1 AND updated_at > '{push_esc}'{realm_sql}"
);
let edges_sql = format!(
"SELECT {EDGE_COLS} FROM memory_edges WHERE created_at > '{push_esc}'{realm_sql}"
);
let local_nodes = sql_rows(&shared, &nodes_sql).await;
let local_edges = sql_rows(&shared, &edges_sql).await;
let (pushed_n, pushed_e) =
push(&client, &base, &cfg, &local_nodes, &local_edges).await?;
engine.sqlite.set_sync_state(PUSH_WM_KEY, &cfg.now).await?;
let pull_wm = engine
.sqlite
.get_sync_state(PULL_WM_KEY)
.await?
.unwrap_or_else(|| EPOCH.to_string());
let mut req = client
.get(format!("{base}/v1/agent/memory/changes"))
.query(&[("since", pull_wm.as_str())]);
if let Some(r) = &cfg.realm {
req = req.query(&[("realm", r.as_str())]);
}
if let Some(t) = &cfg.token {
req = req.bearer_auth(t);
}
let resp = req.send().await.context("GET /v1/agent/memory/changes")?;
let resp = resp
.error_for_status()
.context("memory/changes returned an error status")?;
let body: Value = resp.json().await.context("decoding changes response")?;
let nodes = body["memory_nodes"].as_array().cloned().unwrap_or_default();
let edges = body["memory_edges"].as_array().cloned().unwrap_or_default();
let until = body["until"].as_str().unwrap_or("").to_string();
if !nodes.is_empty() || !edges.is_empty() {
let embed = kyma_memory::shared_embedding()
.await
.map_err(|e| anyhow!("embedding backend: {e}"))?;
let writer = MemoryWriter::new(engine.catalog.clone(), engine.format.clone(), embed);
if !nodes.is_empty() {
writer
.append_node_rows(nodes.clone())
.await
.map_err(|e| anyhow!("apply pulled nodes: {e}"))?;
}
if !edges.is_empty() {
writer
.append_edge_rows(edges.clone())
.await
.map_err(|e| anyhow!("apply pulled edges: {e}"))?;
}
}
if !until.is_empty() {
engine.sqlite.set_sync_state(PULL_WM_KEY, &until).await?;
}
eprintln!(
"sync ok — pushed {pushed_n} nodes / {pushed_e} edges; pulled {} nodes / {} edges (cloud: {base})",
nodes.len(),
edges.len()
);
Ok(())
}
async fn sql_rows(shared: &SharedToolCtx, sql: &str) -> Vec<Value> {
let res = execute_sql(shared, kyma_memory::DEFAULT_DATABASE, sql, 1_000_000).await;
res.get("rows")
.and_then(Value::as_array)
.cloned()
.unwrap_or_default()
}
async fn push(
client: &reqwest::Client,
base: &str,
cfg: &SyncConfig,
nodes: &[Value],
edges: &[Value],
) -> Result<(usize, usize)> {
if nodes.is_empty() && edges.is_empty() {
return Ok((0, 0));
}
let body = serde_json::json!({ "memory_nodes": nodes, "memory_edges": edges });
let mut req = client
.post(format!("{base}/v1/agent/memory/import"))
.json(&body);
if let Some(t) = &cfg.token {
req = req.bearer_auth(t);
}
let resp = req.send().await.context("POST /v1/agent/memory/import")?;
let status = resp.status();
let text = resp.text().await.unwrap_or_default();
if !status.is_success() {
return Err(anyhow!("memory/import returned {status}: {text}"));
}
if let Ok(v) = serde_json::from_str::<Value>(&text) {
if let Some(errs) = v.get("errors").and_then(Value::as_array) {
if !errs.is_empty() {
return Err(anyhow!("memory/import errors: {errs:?}"));
}
}
}
Ok((nodes.len(), edges.len()))
}