asurada 0.1.0

Asurada — a memory + cognition daemon that grows with the user. Local-first, BYOK, shared by Devist/Webchemist Core/etc.
// brain.db → Supabase Postgres push.
//
// 알고리즘:
//   1. brain.db 에서 unsynced (synced_at IS NULL OR updated_at > synced_at) 조회.
//   2. Postgres 로 INSERT ... ON CONFLICT (id) DO UPDATE.
//   3. 성공한 row 의 synced_at 을 현재 시각으로 마킹.
//
// 충돌: Postgres 측 ON CONFLICT 절에 WHERE updated_at < EXCLUDED.updated_at 로
// last-writer-wins. cloud 가 더 최신이면 push 가 no-op (다음 pull 에서 가져옴).

use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use deadpool_postgres::Client;

use crate::db::{self, advice::Advice, event::Event, memory::Memory, project::Project};

const PUSH_BATCH: usize = 100;

impl super::Sync {
    /// events 테이블의 unsynced 항목들을 Postgres 로 push.
    pub async fn push_events(&self) -> Result<usize> {
        let unsynced = {
            let conn = self.brain.lock().unwrap();
            db::event::list_unsynced(&conn, PUSH_BATCH)?
        };
        if unsynced.is_empty() {
            return Ok(0);
        }

        let mut client = self.pg.get().await.context("get client")?;
        let tx = client.transaction().await?;
        for ev in &unsynced {
            upsert_event(&tx, ev).await?;
        }
        tx.commit().await.context("commit events")?;

        let now = Utc::now().to_rfc3339();
        let ids: Vec<&str> = unsynced.iter().map(|e| e.id.as_str()).collect();
        {
            let conn = self.brain.lock().unwrap();
            db::event::mark_synced(&conn, &ids, &now)?;
        }
        Ok(unsynced.len())
    }

    /// memories 테이블의 unsynced 항목들을 Postgres 로 push.
    pub async fn push_memories(&self) -> Result<usize> {
        let unsynced = {
            let conn = self.brain.lock().unwrap();
            list_unsynced_memories(&conn, PUSH_BATCH)?
        };
        if unsynced.is_empty() {
            return Ok(0);
        }

        let mut client = self.pg.get().await?;
        let tx = client.transaction().await?;
        for m in &unsynced {
            upsert_memory(&tx, m).await?;
        }
        tx.commit().await?;

        let now = Utc::now().to_rfc3339();
        let ids: Vec<&str> = unsynced.iter().map(|m| m.id.as_str()).collect();
        {
            let conn = self.brain.lock().unwrap();
            mark_memories_synced(&conn, &ids, &now)?;
        }
        Ok(unsynced.len())
    }

    /// advice 테이블의 unsynced 항목들을 Postgres 로 push.
    pub async fn push_advice(&self) -> Result<usize> {
        let unsynced = {
            let conn = self.brain.lock().unwrap();
            db::advice::list_unsynced(&conn, PUSH_BATCH)?
        };
        if unsynced.is_empty() {
            return Ok(0);
        }

        let mut client = self.pg.get().await?;
        let tx = client.transaction().await?;
        for a in &unsynced {
            upsert_advice(&tx, a).await?;
        }
        tx.commit().await?;

        let now = Utc::now().to_rfc3339();
        let ids: Vec<&str> = unsynced.iter().map(|a| a.id.as_str()).collect();
        {
            let conn = self.brain.lock().unwrap();
            db::advice::mark_synced(&conn, &ids, &now)?;
        }
        Ok(unsynced.len())
    }

    /// projects 테이블의 unsynced 항목들을 Postgres 로 push.
    pub async fn push_projects(&self) -> Result<usize> {
        let unsynced = {
            let conn = self.brain.lock().unwrap();
            db::project::list_unsynced(&conn, PUSH_BATCH)?
        };
        if unsynced.is_empty() {
            return Ok(0);
        }

        let mut client = self.pg.get().await?;
        let tx = client.transaction().await?;
        for p in &unsynced {
            upsert_project(&tx, p).await?;
        }
        tx.commit().await?;

        let now = Utc::now().to_rfc3339();
        let keys: Vec<(String, String)> = unsynced
            .iter()
            .map(|p| (p.user_id.clone(), p.name.clone()))
            .collect();
        {
            let conn = self.brain.lock().unwrap();
            db::project::mark_synced(&conn, &keys, &now)?;
        }
        Ok(unsynced.len())
    }

    pub async fn push_all(&self) -> Result<PushSummary> {
        Ok(PushSummary {
            events: self.push_events().await?,
            memories: self.push_memories().await?,
            advice: self.push_advice().await?,
            projects: self.push_projects().await?,
        })
    }
}

#[derive(Debug, Clone, serde::Serialize)]
pub struct PushSummary {
    pub events: usize,
    pub memories: usize,
    pub advice: usize,
    pub projects: usize,
}

// ── 테이블별 upsert 쿼리 (tokio-postgres) ─────────────────

async fn upsert_event(tx: &deadpool_postgres::Transaction<'_>, ev: &Event) -> Result<()> {
    let created = parse_ts(&ev.created_at)?;
    let updated = parse_ts(&ev.updated_at)?;
    let payload_json: serde_json::Value = ev.payload.clone();
    tx.execute(
        r#"INSERT INTO asurada.events
            (id, user_id, project, event_type, path, payload, created_at, updated_at)
           VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
           ON CONFLICT (id) DO UPDATE SET
               event_type = EXCLUDED.event_type,
               path = EXCLUDED.path,
               payload = EXCLUDED.payload,
               updated_at = EXCLUDED.updated_at
           WHERE asurada.events.updated_at < EXCLUDED.updated_at"#,
        &[
            &ev.id,
            &ev.user_id,
            &ev.project,
            &ev.event_type,
            &ev.path,
            &payload_json,
            &created,
            &updated,
        ],
    )
    .await
    .context("upsert event")?;
    Ok(())
}

async fn upsert_memory(tx: &deadpool_postgres::Transaction<'_>, m: &Memory) -> Result<()> {
    let created = parse_ts(&m.created_at)?;
    let updated = parse_ts(&m.updated_at)?;
    let deleted: Option<DateTime<Utc>> = match &m.deleted_at {
        Some(s) => Some(parse_ts(s)?),
        None => None,
    };
    let metadata_json: serde_json::Value = m.metadata.clone();
    tx.execute(
        r#"INSERT INTO asurada.memories
            (id, user_id, text, scope, priority, source, project, tech, metadata,
             status, created_at, updated_at, deleted_at)
           VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
           ON CONFLICT (id) DO UPDATE SET
               text = EXCLUDED.text,
               scope = EXCLUDED.scope,
               priority = EXCLUDED.priority,
               source = EXCLUDED.source,
               project = EXCLUDED.project,
               tech = EXCLUDED.tech,
               metadata = EXCLUDED.metadata,
               status = EXCLUDED.status,
               updated_at = EXCLUDED.updated_at,
               deleted_at = EXCLUDED.deleted_at
           WHERE asurada.memories.updated_at < EXCLUDED.updated_at"#,
        &[
            &m.id,
            &m.user_id,
            &m.text,
            &m.scope,
            &m.priority,
            &m.source,
            &m.project,
            &m.tech,
            &metadata_json,
            &m.status,
            &created,
            &updated,
            &deleted,
        ],
    )
    .await
    .context("upsert memory")?;
    Ok(())
}

async fn upsert_advice(tx: &deadpool_postgres::Transaction<'_>, a: &Advice) -> Result<()> {
    let created = parse_ts(&a.created_at)?;
    let updated = parse_ts(&a.updated_at)?;
    let confirmed: Option<DateTime<Utc>> = match &a.confirmed_at {
        Some(s) => Some(parse_ts(s)?),
        None => None,
    };
    let paths_json: serde_json::Value = serde_json::to_value(&a.paths)?;
    let metadata_json: serde_json::Value = a.metadata.clone();
    tx.execute(
        r#"INSERT INTO asurada.advice
            (id, user_id, project, text, severity, paths, verifiable, state,
             confirmed_at, confirmed_by, metadata, created_at, updated_at)
           VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
           ON CONFLICT (id) DO UPDATE SET
               text = EXCLUDED.text,
               severity = EXCLUDED.severity,
               paths = EXCLUDED.paths,
               verifiable = EXCLUDED.verifiable,
               state = EXCLUDED.state,
               confirmed_at = EXCLUDED.confirmed_at,
               confirmed_by = EXCLUDED.confirmed_by,
               metadata = EXCLUDED.metadata,
               updated_at = EXCLUDED.updated_at
           WHERE asurada.advice.updated_at < EXCLUDED.updated_at"#,
        &[
            &a.id,
            &a.user_id,
            &a.project,
            &a.text,
            &a.severity,
            &paths_json,
            &a.verifiable,
            &a.state,
            &confirmed,
            &a.confirmed_by,
            &metadata_json,
            &created,
            &updated,
        ],
    )
    .await
    .context("upsert advice")?;
    Ok(())
}

async fn upsert_project(tx: &deadpool_postgres::Transaction<'_>, p: &Project) -> Result<()> {
    let created = parse_ts(&p.created_at)?;
    let updated = parse_ts(&p.updated_at)?;
    let metadata_json: serde_json::Value = p.metadata.clone();
    tx.execute(
        r#"INSERT INTO asurada.projects
            (user_id, name, path, metadata, created_at, updated_at)
           VALUES ($1, $2, $3, $4, $5, $6)
           ON CONFLICT (user_id, name) DO UPDATE SET
               path = EXCLUDED.path,
               metadata = EXCLUDED.metadata,
               updated_at = EXCLUDED.updated_at
           WHERE asurada.projects.updated_at < EXCLUDED.updated_at"#,
        &[
            &p.user_id,
            &p.name,
            &p.path,
            &metadata_json,
            &created,
            &updated,
        ],
    )
    .await
    .context("upsert project")?;
    Ok(())
}

// ── memories 의 unsynced/mark_synced 헬퍼 (memory.rs 에 sync 메서드 없음) ──

fn list_unsynced_memories(conn: &rusqlite::Connection, limit: usize) -> Result<Vec<Memory>> {
    let mut stmt = conn.prepare(
        r#"SELECT id, user_id, text, scope, priority, source, project, tech, metadata,
                  status, created_at, updated_at, deleted_at, synced_at
           FROM memories
           WHERE synced_at IS NULL OR updated_at > synced_at
           ORDER BY updated_at ASC
           LIMIT ?1"#,
    )?;
    let rows: Vec<Memory> = stmt
        .query_map([limit as i64], |row| {
            let metadata_str: String = row.get(8)?;
            let metadata = serde_json::from_str(&metadata_str)
                .unwrap_or_else(|_| serde_json::json!({}));
            Ok(Memory {
                id: row.get(0)?,
                user_id: row.get(1)?,
                text: row.get(2)?,
                scope: row.get(3)?,
                priority: row.get(4)?,
                source: row.get(5)?,
                project: row.get(6)?,
                tech: row.get(7)?,
                metadata,
                status: row.get(9)?,
                created_at: row.get(10)?,
                updated_at: row.get(11)?,
                deleted_at: row.get(12)?,
            })
        })?
        .filter_map(|r| r.ok())
        .collect();
    Ok(rows)
}

fn mark_memories_synced(
    conn: &rusqlite::Connection,
    ids: &[&str],
    when: &str,
) -> Result<()> {
    if ids.is_empty() {
        return Ok(());
    }
    let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
    let sql = format!(
        "UPDATE memories SET synced_at = ? WHERE id IN ({})",
        placeholders
    );
    let mut stmt = conn.prepare(&sql)?;
    let mut binds: Vec<rusqlite::types::Value> = Vec::with_capacity(ids.len() + 1);
    binds.push(when.to_string().into());
    for id in ids {
        binds.push((*id).to_string().into());
    }
    stmt.execute(rusqlite::params_from_iter(binds.iter()))?;
    Ok(())
}

fn parse_ts(s: &str) -> Result<DateTime<Utc>> {
    DateTime::parse_from_rfc3339(s)
        .map(|d| d.with_timezone(&Utc))
        .with_context(|| format!("parse timestamp '{}'", s))
}

// 사용되지 않는 import 회피 (Client 타입은 deadpool_postgres::Transaction 안에 잠재적).
#[allow(dead_code)]
fn _client_marker(_: &Client) {}