use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use deadpool_postgres::Client;
use crate::db::{
self, advice::Advice, event::Event, harness::Harness, intent::Intent, issue::Issue,
memory::Memory, project::Project,
};
const PUSH_BATCH: usize = 100;
impl super::Sync {
pub async fn push_events(&self) -> Result<usize> {
let unsynced = {
let conn = self.brain.lock().unwrap();
db::event::list_unsynced(&conn, PUSH_BATCH)?
};
if unsynced.is_empty() {
return Ok(0);
}
let mut client = self.pg.get().await.context("get client")?;
let tx = client.transaction().await?;
for ev in &unsynced {
upsert_event(&tx, ev).await?;
}
tx.commit().await.context("commit events")?;
let now = Utc::now().to_rfc3339();
let ids: Vec<&str> = unsynced.iter().map(|e| e.id.as_str()).collect();
{
let conn = self.brain.lock().unwrap();
db::event::mark_synced(&conn, &ids, &now)?;
}
Ok(unsynced.len())
}
pub async fn push_memories(&self) -> Result<usize> {
let unsynced = {
let conn = self.brain.lock().unwrap();
list_unsynced_memories(&conn, PUSH_BATCH)?
};
if unsynced.is_empty() {
return Ok(0);
}
let mut client = self.pg.get().await?;
let tx = client.transaction().await?;
for m in &unsynced {
upsert_memory(&tx, m).await?;
}
tx.commit().await?;
let now = Utc::now().to_rfc3339();
let ids: Vec<&str> = unsynced.iter().map(|m| m.id.as_str()).collect();
{
let conn = self.brain.lock().unwrap();
mark_memories_synced(&conn, &ids, &now)?;
}
Ok(unsynced.len())
}
pub async fn push_advice(&self) -> Result<usize> {
let unsynced = {
let conn = self.brain.lock().unwrap();
db::advice::list_unsynced(&conn, PUSH_BATCH)?
};
if unsynced.is_empty() {
return Ok(0);
}
let mut client = self.pg.get().await?;
let tx = client.transaction().await?;
for a in &unsynced {
upsert_advice(&tx, a).await?;
}
tx.commit().await?;
let now = Utc::now().to_rfc3339();
let ids: Vec<&str> = unsynced.iter().map(|a| a.id.as_str()).collect();
{
let conn = self.brain.lock().unwrap();
db::advice::mark_synced(&conn, &ids, &now)?;
}
Ok(unsynced.len())
}
pub async fn push_projects(&self) -> Result<usize> {
let unsynced = {
let conn = self.brain.lock().unwrap();
db::project::list_unsynced(&conn, PUSH_BATCH)?
};
if unsynced.is_empty() {
return Ok(0);
}
let mut client = self.pg.get().await?;
let tx = client.transaction().await?;
for p in &unsynced {
upsert_project(&tx, p).await?;
}
tx.commit().await?;
let now = Utc::now().to_rfc3339();
let keys: Vec<(String, String)> = unsynced
.iter()
.map(|p| (p.user_id.clone(), p.name.clone()))
.collect();
{
let conn = self.brain.lock().unwrap();
db::project::mark_synced(&conn, &keys, &now)?;
}
Ok(unsynced.len())
}
pub async fn push_intents(&self) -> Result<usize> {
let unsynced = {
let conn = self.brain.lock().unwrap();
list_unsynced_intents(&conn, PUSH_BATCH)?
};
if unsynced.is_empty() {
return Ok(0);
}
let mut client = self.pg.get().await?;
let tx = client.transaction().await?;
for it in &unsynced {
upsert_intent(&tx, it).await?;
}
tx.commit().await?;
let now = Utc::now().to_rfc3339();
let ids: Vec<&str> = unsynced.iter().map(|i| i.id.as_str()).collect();
{
let conn = self.brain.lock().unwrap();
mark_intents_synced(&conn, &ids, &now)?;
}
Ok(unsynced.len())
}
pub async fn push_harnesses(&self) -> Result<usize> {
let unsynced = {
let conn = self.brain.lock().unwrap();
db::harness::list_unsynced(&conn, PUSH_BATCH)?
};
if unsynced.is_empty() {
return Ok(0);
}
let mut client = self.pg.get().await?;
let tx = client.transaction().await?;
for h in &unsynced {
upsert_harness(&tx, h).await?;
}
tx.commit().await?;
let now = Utc::now().to_rfc3339();
let ids: Vec<&str> = unsynced.iter().map(|h| h.id.as_str()).collect();
{
let conn = self.brain.lock().unwrap();
db::harness::mark_synced(&conn, &ids, &now)?;
}
Ok(unsynced.len())
}
pub async fn push_issues(&self) -> Result<usize> {
let unsynced = {
let conn = self.brain.lock().unwrap();
db::issue::list_unsynced(&conn, PUSH_BATCH)?
};
if unsynced.is_empty() {
return Ok(0);
}
let mut client = self.pg.get().await?;
let tx = client.transaction().await?;
for it in &unsynced {
upsert_issue(&tx, it).await?;
}
tx.commit().await?;
let now = Utc::now().to_rfc3339();
let ids: Vec<&str> = unsynced.iter().map(|i| i.id.as_str()).collect();
{
let conn = self.brain.lock().unwrap();
db::issue::mark_synced(&conn, &ids, &now)?;
}
Ok(unsynced.len())
}
pub async fn push_all(&self) -> Result<PushSummary> {
Ok(PushSummary {
events: self.push_events().await?,
memories: self.push_memories().await?,
advice: self.push_advice().await?,
projects: self.push_projects().await?,
intents: self.push_intents().await?,
harnesses: self.push_harnesses().await?,
issues: self.push_issues().await?,
})
}
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct PushSummary {
pub events: usize,
pub memories: usize,
pub advice: usize,
pub projects: usize,
pub intents: usize,
pub harnesses: usize,
pub issues: usize,
}
async fn upsert_event(tx: &deadpool_postgres::Transaction<'_>, ev: &Event) -> Result<()> {
let created = parse_ts(&ev.created_at)?;
let updated = parse_ts(&ev.updated_at)?;
let payload_json: serde_json::Value = ev.payload.clone();
tx.execute(
r#"INSERT INTO asurada.events
(id, user_id, project, event_type, path, payload, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (id) DO UPDATE SET
event_type = EXCLUDED.event_type,
path = EXCLUDED.path,
payload = EXCLUDED.payload,
updated_at = EXCLUDED.updated_at
WHERE asurada.events.updated_at < EXCLUDED.updated_at"#,
&[
&ev.id,
&ev.user_id,
&ev.project,
&ev.event_type,
&ev.path,
&payload_json,
&created,
&updated,
],
)
.await
.context("upsert event")?;
Ok(())
}
async fn upsert_memory(tx: &deadpool_postgres::Transaction<'_>, m: &Memory) -> Result<()> {
let created = parse_ts(&m.created_at)?;
let updated = parse_ts(&m.updated_at)?;
let deleted: Option<DateTime<Utc>> = match &m.deleted_at {
Some(s) => Some(parse_ts(s)?),
None => None,
};
let metadata_json: serde_json::Value = m.metadata.clone();
tx.execute(
r#"INSERT INTO asurada.memories
(id, user_id, text, scope, priority, source, project, tech, metadata,
status, created_at, updated_at, deleted_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
ON CONFLICT (id) DO UPDATE SET
text = EXCLUDED.text,
scope = EXCLUDED.scope,
priority = EXCLUDED.priority,
source = EXCLUDED.source,
project = EXCLUDED.project,
tech = EXCLUDED.tech,
metadata = EXCLUDED.metadata,
status = EXCLUDED.status,
updated_at = EXCLUDED.updated_at,
deleted_at = EXCLUDED.deleted_at
WHERE asurada.memories.updated_at < EXCLUDED.updated_at"#,
&[
&m.id,
&m.user_id,
&m.text,
&m.scope,
&m.priority,
&m.source,
&m.project,
&m.tech,
&metadata_json,
&m.status,
&created,
&updated,
&deleted,
],
)
.await
.context("upsert memory")?;
Ok(())
}
async fn upsert_advice(tx: &deadpool_postgres::Transaction<'_>, a: &Advice) -> Result<()> {
let created = parse_ts(&a.created_at)?;
let updated = parse_ts(&a.updated_at)?;
let confirmed: Option<DateTime<Utc>> = match &a.confirmed_at {
Some(s) => Some(parse_ts(s)?),
None => None,
};
let paths_json: serde_json::Value = serde_json::to_value(&a.paths)?;
let metadata_json: serde_json::Value = a.metadata.clone();
tx.execute(
r#"INSERT INTO asurada.advice
(id, user_id, project, text, severity, paths, verifiable, state,
confirmed_at, confirmed_by, metadata, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
ON CONFLICT (id) DO UPDATE SET
text = EXCLUDED.text,
severity = EXCLUDED.severity,
paths = EXCLUDED.paths,
verifiable = EXCLUDED.verifiable,
state = EXCLUDED.state,
confirmed_at = EXCLUDED.confirmed_at,
confirmed_by = EXCLUDED.confirmed_by,
metadata = EXCLUDED.metadata,
updated_at = EXCLUDED.updated_at
WHERE asurada.advice.updated_at < EXCLUDED.updated_at"#,
&[
&a.id,
&a.user_id,
&a.project,
&a.text,
&a.severity,
&paths_json,
&a.verifiable,
&a.state,
&confirmed,
&a.confirmed_by,
&metadata_json,
&created,
&updated,
],
)
.await
.context("upsert advice")?;
Ok(())
}
async fn upsert_project(tx: &deadpool_postgres::Transaction<'_>, p: &Project) -> Result<()> {
let created = parse_ts(&p.created_at)?;
let updated = parse_ts(&p.updated_at)?;
let metadata_json: serde_json::Value = p.metadata.clone();
tx.execute(
r#"INSERT INTO asurada.projects
(user_id, name, path, metadata, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (user_id, name) DO UPDATE SET
path = EXCLUDED.path,
metadata = EXCLUDED.metadata,
updated_at = EXCLUDED.updated_at
WHERE asurada.projects.updated_at < EXCLUDED.updated_at"#,
&[
&p.user_id,
&p.name,
&p.path,
&metadata_json,
&created,
&updated,
],
)
.await
.context("upsert project")?;
Ok(())
}
async fn upsert_intent(tx: &deadpool_postgres::Transaction<'_>, it: &Intent) -> Result<()> {
let created = parse_ts(&it.created_at)?;
let updated = parse_ts(&it.updated_at)?;
let metadata_json: serde_json::Value = it.metadata.clone();
let signal_ids_json: serde_json::Value = serde_json::to_value(&it.source_signal_ids)?;
tx.execute(
r#"INSERT INTO asurada.intents
(id, user_id, project, strength, intent_text, source,
source_signal_ids, status, metadata, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
ON CONFLICT (id) DO UPDATE SET
project = EXCLUDED.project,
strength = EXCLUDED.strength,
intent_text = EXCLUDED.intent_text,
source = EXCLUDED.source,
source_signal_ids = EXCLUDED.source_signal_ids,
status = EXCLUDED.status,
metadata = EXCLUDED.metadata,
updated_at = EXCLUDED.updated_at
WHERE asurada.intents.updated_at < EXCLUDED.updated_at"#,
&[
&it.id,
&it.user_id,
&it.project,
&it.strength.as_str(),
&it.intent_text,
&it.source.as_str(),
&signal_ids_json,
&it.status.as_str(),
&metadata_json,
&created,
&updated,
],
)
.await
.context("upsert intent")?;
Ok(())
}
async fn upsert_harness(tx: &deadpool_postgres::Transaction<'_>, h: &Harness) -> Result<()> {
let created = parse_ts(&h.created_at)?;
let updated = parse_ts(&h.updated_at)?;
let last_used: Option<DateTime<Utc>> = match &h.last_used_at {
Some(s) => Some(parse_ts(s)?),
None => None,
};
let file_paths_json: serde_json::Value = serde_json::to_value(&h.file_paths)?;
let evolution_json: serde_json::Value = serde_json::to_value(&h.evolution_log)?;
let signal_ids_json: serde_json::Value = serde_json::to_value(&h.source_signal_ids)?;
let metadata_json: serde_json::Value = h.metadata.clone();
tx.execute(
r#"INSERT INTO asurada.harnesses
(id, user_id, project, slug, title, description, reason,
file_paths, usage_count, last_used_at, evolution_log,
source_signal_ids, source_cluster_signature, status, metadata,
created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11,
$12, $13, $14, $15, $16, $17)
ON CONFLICT (id) DO UPDATE SET
project = EXCLUDED.project,
slug = EXCLUDED.slug,
title = EXCLUDED.title,
description = EXCLUDED.description,
reason = EXCLUDED.reason,
file_paths = EXCLUDED.file_paths,
usage_count = EXCLUDED.usage_count,
last_used_at = EXCLUDED.last_used_at,
evolution_log = EXCLUDED.evolution_log,
source_signal_ids = EXCLUDED.source_signal_ids,
source_cluster_signature = EXCLUDED.source_cluster_signature,
status = EXCLUDED.status,
metadata = EXCLUDED.metadata,
updated_at = EXCLUDED.updated_at
WHERE asurada.harnesses.updated_at < EXCLUDED.updated_at"#,
&[
&h.id,
&h.user_id,
&h.project,
&h.slug,
&h.title,
&h.description,
&h.reason,
&file_paths_json,
&h.usage_count,
&last_used,
&evolution_json,
&signal_ids_json,
&h.source_cluster_signature,
&h.status,
&metadata_json,
&created,
&updated,
],
)
.await
.context("upsert harness")?;
Ok(())
}
async fn upsert_issue(tx: &deadpool_postgres::Transaction<'_>, it: &Issue) -> Result<()> {
let started = parse_ts(&it.started_at)?;
let ended: Option<DateTime<Utc>> = match &it.ended_at {
Some(s) => Some(parse_ts(s)?),
None => None,
};
let created = parse_ts(&it.created_at)?;
let updated = parse_ts(&it.updated_at)?;
let projects_json: serde_json::Value = serde_json::to_value(&it.projects)?;
let metadata_json: serde_json::Value = it.metadata.clone();
tx.execute(
r#"INSERT INTO asurada.issues
(id, user_id, title, summary, projects, status,
started_at, ended_at, event_count, metadata,
created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
ON CONFLICT (id) DO UPDATE SET
title = EXCLUDED.title,
summary = EXCLUDED.summary,
projects = EXCLUDED.projects,
status = EXCLUDED.status,
started_at = EXCLUDED.started_at,
ended_at = EXCLUDED.ended_at,
event_count = EXCLUDED.event_count,
metadata = EXCLUDED.metadata,
updated_at = EXCLUDED.updated_at
WHERE asurada.issues.updated_at < EXCLUDED.updated_at"#,
&[
&it.id,
&it.user_id,
&it.title,
&it.summary,
&projects_json,
&it.status,
&started,
&ended,
&it.event_count,
&metadata_json,
&created,
&updated,
],
)
.await
.context("upsert issue")?;
Ok(())
}
fn list_unsynced_intents(conn: &rusqlite::Connection, limit: usize) -> Result<Vec<Intent>> {
use rusqlite::params;
let mut stmt = conn.prepare(
r#"SELECT id, user_id, project, strength, intent_text, source,
source_signal_ids, status, metadata, created_at, updated_at, synced_at
FROM intents
WHERE synced_at IS NULL OR updated_at > synced_at
ORDER BY updated_at ASC
LIMIT ?1"#,
)?;
let rows: Vec<Intent> = stmt
.query_map(params![limit as i64], |row| {
let strength_str: String = row.get(3)?;
let source_str: String = row.get(5)?;
let source_signal_ids_str: String = row.get(6)?;
let status_str: String = row.get(7)?;
let metadata_str: String = row.get(8)?;
Ok(Intent {
id: row.get(0)?,
user_id: row.get(1)?,
project: row.get(2)?,
strength: crate::db::intent::Strength::parse(&strength_str)
.unwrap_or(crate::db::intent::Strength::Preference),
intent_text: row.get(4)?,
source: crate::db::intent::Source::parse(&source_str)
.unwrap_or(crate::db::intent::Source::User),
source_signal_ids: serde_json::from_str(&source_signal_ids_str).unwrap_or_default(),
status: match status_str.as_str() {
"archived" => crate::db::intent::Status::Archived,
_ => crate::db::intent::Status::Active,
},
metadata: serde_json::from_str(&metadata_str)
.unwrap_or_else(|_| serde_json::json!({})),
created_at: row.get(9)?,
updated_at: row.get(10)?,
synced_at: row.get(11)?,
})
})?
.filter_map(|r| r.ok())
.collect();
Ok(rows)
}
fn mark_intents_synced(conn: &rusqlite::Connection, ids: &[&str], when: &str) -> Result<()> {
if ids.is_empty() {
return Ok(());
}
let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
let sql = format!(
"UPDATE intents SET synced_at = ? WHERE id IN ({})",
placeholders
);
let mut stmt = conn.prepare(&sql)?;
let mut binds: Vec<rusqlite::types::Value> = Vec::with_capacity(ids.len() + 1);
binds.push(when.to_string().into());
for id in ids {
binds.push((*id).to_string().into());
}
stmt.execute(rusqlite::params_from_iter(binds.iter()))?;
Ok(())
}
fn list_unsynced_memories(conn: &rusqlite::Connection, limit: usize) -> Result<Vec<Memory>> {
let mut stmt = conn.prepare(
r#"SELECT id, user_id, text, scope, priority, source, project, tech, metadata,
status, created_at, updated_at, deleted_at, synced_at
FROM memories
WHERE synced_at IS NULL OR updated_at > synced_at
ORDER BY updated_at ASC
LIMIT ?1"#,
)?;
let rows: Vec<Memory> = stmt
.query_map([limit as i64], |row| {
let metadata_str: String = row.get(8)?;
let metadata =
serde_json::from_str(&metadata_str).unwrap_or_else(|_| serde_json::json!({}));
Ok(Memory {
id: row.get(0)?,
user_id: row.get(1)?,
text: row.get(2)?,
scope: row.get(3)?,
priority: row.get(4)?,
source: row.get(5)?,
project: row.get(6)?,
tech: row.get(7)?,
metadata,
status: row.get(9)?,
created_at: row.get(10)?,
updated_at: row.get(11)?,
deleted_at: row.get(12)?,
})
})?
.filter_map(|r| r.ok())
.collect();
Ok(rows)
}
fn mark_memories_synced(conn: &rusqlite::Connection, ids: &[&str], when: &str) -> Result<()> {
if ids.is_empty() {
return Ok(());
}
let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
let sql = format!(
"UPDATE memories SET synced_at = ? WHERE id IN ({})",
placeholders
);
let mut stmt = conn.prepare(&sql)?;
let mut binds: Vec<rusqlite::types::Value> = Vec::with_capacity(ids.len() + 1);
binds.push(when.to_string().into());
for id in ids {
binds.push((*id).to_string().into());
}
stmt.execute(rusqlite::params_from_iter(binds.iter()))?;
Ok(())
}
fn parse_ts(s: &str) -> Result<DateTime<Utc>> {
DateTime::parse_from_rfc3339(s)
.map(|d| d.with_timezone(&Utc))
.with_context(|| format!("parse timestamp '{}'", s))
}
#[allow(dead_code)]
fn _client_marker(_: &Client) {}