use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use deadpool_postgres::Client;
use crate::db::{self, advice::Advice, event::Event, memory::Memory, project::Project};
const PUSH_BATCH: usize = 100;
impl super::Sync {
pub async fn push_events(&self) -> Result<usize> {
let unsynced = {
let conn = self.brain.lock().unwrap();
db::event::list_unsynced(&conn, PUSH_BATCH)?
};
if unsynced.is_empty() {
return Ok(0);
}
let mut client = self.pg.get().await.context("get client")?;
let tx = client.transaction().await?;
for ev in &unsynced {
upsert_event(&tx, ev).await?;
}
tx.commit().await.context("commit events")?;
let now = Utc::now().to_rfc3339();
let ids: Vec<&str> = unsynced.iter().map(|e| e.id.as_str()).collect();
{
let conn = self.brain.lock().unwrap();
db::event::mark_synced(&conn, &ids, &now)?;
}
Ok(unsynced.len())
}
pub async fn push_memories(&self) -> Result<usize> {
let unsynced = {
let conn = self.brain.lock().unwrap();
list_unsynced_memories(&conn, PUSH_BATCH)?
};
if unsynced.is_empty() {
return Ok(0);
}
let mut client = self.pg.get().await?;
let tx = client.transaction().await?;
for m in &unsynced {
upsert_memory(&tx, m).await?;
}
tx.commit().await?;
let now = Utc::now().to_rfc3339();
let ids: Vec<&str> = unsynced.iter().map(|m| m.id.as_str()).collect();
{
let conn = self.brain.lock().unwrap();
mark_memories_synced(&conn, &ids, &now)?;
}
Ok(unsynced.len())
}
pub async fn push_advice(&self) -> Result<usize> {
let unsynced = {
let conn = self.brain.lock().unwrap();
db::advice::list_unsynced(&conn, PUSH_BATCH)?
};
if unsynced.is_empty() {
return Ok(0);
}
let mut client = self.pg.get().await?;
let tx = client.transaction().await?;
for a in &unsynced {
upsert_advice(&tx, a).await?;
}
tx.commit().await?;
let now = Utc::now().to_rfc3339();
let ids: Vec<&str> = unsynced.iter().map(|a| a.id.as_str()).collect();
{
let conn = self.brain.lock().unwrap();
db::advice::mark_synced(&conn, &ids, &now)?;
}
Ok(unsynced.len())
}
pub async fn push_projects(&self) -> Result<usize> {
let unsynced = {
let conn = self.brain.lock().unwrap();
db::project::list_unsynced(&conn, PUSH_BATCH)?
};
if unsynced.is_empty() {
return Ok(0);
}
let mut client = self.pg.get().await?;
let tx = client.transaction().await?;
for p in &unsynced {
upsert_project(&tx, p).await?;
}
tx.commit().await?;
let now = Utc::now().to_rfc3339();
let keys: Vec<(String, String)> = unsynced
.iter()
.map(|p| (p.user_id.clone(), p.name.clone()))
.collect();
{
let conn = self.brain.lock().unwrap();
db::project::mark_synced(&conn, &keys, &now)?;
}
Ok(unsynced.len())
}
pub async fn push_all(&self) -> Result<PushSummary> {
Ok(PushSummary {
events: self.push_events().await?,
memories: self.push_memories().await?,
advice: self.push_advice().await?,
projects: self.push_projects().await?,
})
}
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct PushSummary {
pub events: usize,
pub memories: usize,
pub advice: usize,
pub projects: usize,
}
async fn upsert_event(tx: &deadpool_postgres::Transaction<'_>, ev: &Event) -> Result<()> {
let created = parse_ts(&ev.created_at)?;
let updated = parse_ts(&ev.updated_at)?;
let payload_json: serde_json::Value = ev.payload.clone();
tx.execute(
r#"INSERT INTO asurada.events
(id, user_id, project, event_type, path, payload, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (id) DO UPDATE SET
event_type = EXCLUDED.event_type,
path = EXCLUDED.path,
payload = EXCLUDED.payload,
updated_at = EXCLUDED.updated_at
WHERE asurada.events.updated_at < EXCLUDED.updated_at"#,
&[
&ev.id,
&ev.user_id,
&ev.project,
&ev.event_type,
&ev.path,
&payload_json,
&created,
&updated,
],
)
.await
.context("upsert event")?;
Ok(())
}
async fn upsert_memory(tx: &deadpool_postgres::Transaction<'_>, m: &Memory) -> Result<()> {
let created = parse_ts(&m.created_at)?;
let updated = parse_ts(&m.updated_at)?;
let deleted: Option<DateTime<Utc>> = match &m.deleted_at {
Some(s) => Some(parse_ts(s)?),
None => None,
};
let metadata_json: serde_json::Value = m.metadata.clone();
tx.execute(
r#"INSERT INTO asurada.memories
(id, user_id, text, scope, priority, source, project, tech, metadata,
status, created_at, updated_at, deleted_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
ON CONFLICT (id) DO UPDATE SET
text = EXCLUDED.text,
scope = EXCLUDED.scope,
priority = EXCLUDED.priority,
source = EXCLUDED.source,
project = EXCLUDED.project,
tech = EXCLUDED.tech,
metadata = EXCLUDED.metadata,
status = EXCLUDED.status,
updated_at = EXCLUDED.updated_at,
deleted_at = EXCLUDED.deleted_at
WHERE asurada.memories.updated_at < EXCLUDED.updated_at"#,
&[
&m.id,
&m.user_id,
&m.text,
&m.scope,
&m.priority,
&m.source,
&m.project,
&m.tech,
&metadata_json,
&m.status,
&created,
&updated,
&deleted,
],
)
.await
.context("upsert memory")?;
Ok(())
}
async fn upsert_advice(tx: &deadpool_postgres::Transaction<'_>, a: &Advice) -> Result<()> {
let created = parse_ts(&a.created_at)?;
let updated = parse_ts(&a.updated_at)?;
let confirmed: Option<DateTime<Utc>> = match &a.confirmed_at {
Some(s) => Some(parse_ts(s)?),
None => None,
};
let paths_json: serde_json::Value = serde_json::to_value(&a.paths)?;
let metadata_json: serde_json::Value = a.metadata.clone();
tx.execute(
r#"INSERT INTO asurada.advice
(id, user_id, project, text, severity, paths, verifiable, state,
confirmed_at, confirmed_by, metadata, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
ON CONFLICT (id) DO UPDATE SET
text = EXCLUDED.text,
severity = EXCLUDED.severity,
paths = EXCLUDED.paths,
verifiable = EXCLUDED.verifiable,
state = EXCLUDED.state,
confirmed_at = EXCLUDED.confirmed_at,
confirmed_by = EXCLUDED.confirmed_by,
metadata = EXCLUDED.metadata,
updated_at = EXCLUDED.updated_at
WHERE asurada.advice.updated_at < EXCLUDED.updated_at"#,
&[
&a.id,
&a.user_id,
&a.project,
&a.text,
&a.severity,
&paths_json,
&a.verifiable,
&a.state,
&confirmed,
&a.confirmed_by,
&metadata_json,
&created,
&updated,
],
)
.await
.context("upsert advice")?;
Ok(())
}
async fn upsert_project(tx: &deadpool_postgres::Transaction<'_>, p: &Project) -> Result<()> {
let created = parse_ts(&p.created_at)?;
let updated = parse_ts(&p.updated_at)?;
let metadata_json: serde_json::Value = p.metadata.clone();
tx.execute(
r#"INSERT INTO asurada.projects
(user_id, name, path, metadata, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (user_id, name) DO UPDATE SET
path = EXCLUDED.path,
metadata = EXCLUDED.metadata,
updated_at = EXCLUDED.updated_at
WHERE asurada.projects.updated_at < EXCLUDED.updated_at"#,
&[
&p.user_id,
&p.name,
&p.path,
&metadata_json,
&created,
&updated,
],
)
.await
.context("upsert project")?;
Ok(())
}
fn list_unsynced_memories(conn: &rusqlite::Connection, limit: usize) -> Result<Vec<Memory>> {
let mut stmt = conn.prepare(
r#"SELECT id, user_id, text, scope, priority, source, project, tech, metadata,
status, created_at, updated_at, deleted_at, synced_at
FROM memories
WHERE synced_at IS NULL OR updated_at > synced_at
ORDER BY updated_at ASC
LIMIT ?1"#,
)?;
let rows: Vec<Memory> = stmt
.query_map([limit as i64], |row| {
let metadata_str: String = row.get(8)?;
let metadata = serde_json::from_str(&metadata_str)
.unwrap_or_else(|_| serde_json::json!({}));
Ok(Memory {
id: row.get(0)?,
user_id: row.get(1)?,
text: row.get(2)?,
scope: row.get(3)?,
priority: row.get(4)?,
source: row.get(5)?,
project: row.get(6)?,
tech: row.get(7)?,
metadata,
status: row.get(9)?,
created_at: row.get(10)?,
updated_at: row.get(11)?,
deleted_at: row.get(12)?,
})
})?
.filter_map(|r| r.ok())
.collect();
Ok(rows)
}
fn mark_memories_synced(
conn: &rusqlite::Connection,
ids: &[&str],
when: &str,
) -> Result<()> {
if ids.is_empty() {
return Ok(());
}
let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
let sql = format!(
"UPDATE memories SET synced_at = ? WHERE id IN ({})",
placeholders
);
let mut stmt = conn.prepare(&sql)?;
let mut binds: Vec<rusqlite::types::Value> = Vec::with_capacity(ids.len() + 1);
binds.push(when.to_string().into());
for id in ids {
binds.push((*id).to_string().into());
}
stmt.execute(rusqlite::params_from_iter(binds.iter()))?;
Ok(())
}
fn parse_ts(s: &str) -> Result<DateTime<Utc>> {
DateTime::parse_from_rfc3339(s)
.map(|d| d.with_timezone(&Utc))
.with_context(|| format!("parse timestamp '{}'", s))
}
#[allow(dead_code)]
fn _client_marker(_: &Client) {}