asurada 0.1.0

Asurada — a memory + cognition daemon that grows with the user. Local-first, BYOK, shared by Devist/Webchemist Core/etc.
// Supabase Postgres → brain.db pull.
//
// 알고리즘:
//   1. _sync_state 에서 테이블별 last_pulled_at 조회.
//   2. Supabase 에서 updated_at > last_pulled_at 인 row 가져옴.
//   3. brain.db 로 upsert. WHERE updated_at < excluded.updated_at 로 last-writer-wins.
//   4. synced_at = updated_at 로 마킹 (push 가 다시 같은 row 를 cloud 로 보내지 않도록).
//   5. last_pulled_at = max(updated_at) 갱신.

use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use rusqlite::{params, Connection};

use crate::db::sync_state;

const PULL_BATCH: i64 = 200;

impl super::Sync {
    pub async fn pull_events(&self) -> Result<usize> {
        let cutoff = self.read_cutoff("events")?;
        let client = self.pg.get().await.context("get client")?;
        let rows = client
            .query(
                "SELECT id, user_id, project, event_type, path, payload, created_at, updated_at
                 FROM asurada.events
                 WHERE user_id = $1 AND updated_at > $2
                 ORDER BY updated_at ASC
                 LIMIT $3",
                &[&self.user_id, &cutoff, &PULL_BATCH],
            )
            .await
            .context("pull events query")?;

        if rows.is_empty() {
            return Ok(0);
        }

        let mut max_updated = cutoff;
        {
            let conn = self.brain.lock().unwrap();
            let tx = conn.unchecked_transaction()?;
            for row in &rows {
                let id: String = row.get("id");
                let user_id: String = row.get("user_id");
                let project: String = row.get("project");
                let event_type: String = row.get("event_type");
                let path: Option<String> = row.get("path");
                let payload: serde_json::Value = row.get("payload");
                let created: DateTime<Utc> = row.get("created_at");
                let updated: DateTime<Utc> = row.get("updated_at");

                let created_s = created.to_rfc3339();
                let updated_s = updated.to_rfc3339();

                tx.execute(
                    r#"INSERT INTO events
                        (id, user_id, project, event_type, path, payload,
                         created_at, updated_at, synced_at)
                       VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?8)
                       ON CONFLICT(id) DO UPDATE SET
                           event_type = excluded.event_type,
                           path = excluded.path,
                           payload = excluded.payload,
                           updated_at = excluded.updated_at,
                           synced_at = excluded.synced_at
                       WHERE events.updated_at < excluded.updated_at"#,
                    params![
                        id,
                        user_id,
                        project,
                        event_type,
                        path,
                        payload.to_string(),
                        created_s,
                        updated_s,
                    ],
                )?;

                if updated > max_updated {
                    max_updated = updated;
                }
            }
            sync_state::set_last_pulled(&tx, "events", &max_updated.to_rfc3339())?;
            tx.commit()?;
        }
        Ok(rows.len())
    }

    pub async fn pull_memories(&self) -> Result<usize> {
        let cutoff = self.read_cutoff("memories")?;
        let client = self.pg.get().await?;
        let rows = client
            .query(
                "SELECT id, user_id, text, scope, priority, source, project, tech, metadata,
                        status, created_at, updated_at, deleted_at
                 FROM asurada.memories
                 WHERE user_id = $1 AND updated_at > $2
                 ORDER BY updated_at ASC
                 LIMIT $3",
                &[&self.user_id, &cutoff, &PULL_BATCH],
            )
            .await
            .context("pull memories query")?;

        if rows.is_empty() {
            return Ok(0);
        }

        let mut max_updated = cutoff;
        {
            let conn = self.brain.lock().unwrap();
            let tx = conn.unchecked_transaction()?;
            for row in &rows {
                let id: String = row.get("id");
                let user_id: String = row.get("user_id");
                let text: String = row.get("text");
                let scope: String = row.get("scope");
                let priority: String = row.get("priority");
                let source: String = row.get("source");
                let project: Option<String> = row.get("project");
                let tech: Option<String> = row.get("tech");
                let metadata: serde_json::Value = row.get("metadata");
                let status: String = row.get("status");
                let created: DateTime<Utc> = row.get("created_at");
                let updated: DateTime<Utc> = row.get("updated_at");
                let deleted: Option<DateTime<Utc>> = row.get("deleted_at");

                let updated_s = updated.to_rfc3339();

                tx.execute(
                    r#"INSERT INTO memories
                        (id, user_id, text, scope, priority, source, project, tech, metadata,
                         status, created_at, updated_at, deleted_at, synced_at)
                       VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?12)
                       ON CONFLICT(id) DO UPDATE SET
                           text = excluded.text,
                           scope = excluded.scope,
                           priority = excluded.priority,
                           source = excluded.source,
                           project = excluded.project,
                           tech = excluded.tech,
                           metadata = excluded.metadata,
                           status = excluded.status,
                           updated_at = excluded.updated_at,
                           deleted_at = excluded.deleted_at,
                           synced_at = excluded.synced_at
                       WHERE memories.updated_at < excluded.updated_at"#,
                    params![
                        id,
                        user_id,
                        text,
                        scope,
                        priority,
                        source,
                        project,
                        tech,
                        metadata.to_string(),
                        status,
                        created.to_rfc3339(),
                        updated_s,
                        deleted.map(|d| d.to_rfc3339()),
                    ],
                )?;

                if updated > max_updated {
                    max_updated = updated;
                }
            }
            sync_state::set_last_pulled(&tx, "memories", &max_updated.to_rfc3339())?;
            tx.commit()?;
        }
        Ok(rows.len())
    }

    pub async fn pull_advice(&self) -> Result<usize> {
        let cutoff = self.read_cutoff("advice")?;
        let client = self.pg.get().await?;
        let rows = client
            .query(
                "SELECT id, user_id, project, text, severity, paths, verifiable, state,
                        confirmed_at, confirmed_by, metadata, created_at, updated_at
                 FROM asurada.advice
                 WHERE user_id = $1 AND updated_at > $2
                 ORDER BY updated_at ASC
                 LIMIT $3",
                &[&self.user_id, &cutoff, &PULL_BATCH],
            )
            .await
            .context("pull advice query")?;

        if rows.is_empty() {
            return Ok(0);
        }

        let mut max_updated = cutoff;
        {
            let conn = self.brain.lock().unwrap();
            let tx = conn.unchecked_transaction()?;
            for row in &rows {
                let id: String = row.get("id");
                let user_id: String = row.get("user_id");
                let project: String = row.get("project");
                let text: String = row.get("text");
                let severity: String = row.get("severity");
                let paths: serde_json::Value = row.get("paths");
                let verifiable: bool = row.get("verifiable");
                let state: String = row.get("state");
                let confirmed_at: Option<DateTime<Utc>> = row.get("confirmed_at");
                let confirmed_by: Option<String> = row.get("confirmed_by");
                let metadata: serde_json::Value = row.get("metadata");
                let created: DateTime<Utc> = row.get("created_at");
                let updated: DateTime<Utc> = row.get("updated_at");

                let updated_s = updated.to_rfc3339();

                tx.execute(
                    r#"INSERT INTO advice
                        (id, user_id, project, text, severity, paths, verifiable, state,
                         confirmed_at, confirmed_by, metadata, created_at, updated_at, synced_at)
                       VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?13)
                       ON CONFLICT(id) DO UPDATE SET
                           text = excluded.text,
                           severity = excluded.severity,
                           paths = excluded.paths,
                           verifiable = excluded.verifiable,
                           state = excluded.state,
                           confirmed_at = excluded.confirmed_at,
                           confirmed_by = excluded.confirmed_by,
                           metadata = excluded.metadata,
                           updated_at = excluded.updated_at,
                           synced_at = excluded.synced_at
                       WHERE advice.updated_at < excluded.updated_at"#,
                    params![
                        id,
                        user_id,
                        project,
                        text,
                        severity,
                        paths.to_string(),
                        verifiable as i32,
                        state,
                        confirmed_at.map(|d| d.to_rfc3339()),
                        confirmed_by,
                        metadata.to_string(),
                        created.to_rfc3339(),
                        updated_s,
                    ],
                )?;

                if updated > max_updated {
                    max_updated = updated;
                }
            }
            sync_state::set_last_pulled(&tx, "advice", &max_updated.to_rfc3339())?;
            tx.commit()?;
        }
        Ok(rows.len())
    }

    pub async fn pull_projects(&self) -> Result<usize> {
        let cutoff = self.read_cutoff("projects")?;
        let client = self.pg.get().await?;
        let rows = client
            .query(
                "SELECT user_id, name, path, metadata, created_at, updated_at
                 FROM asurada.projects
                 WHERE user_id = $1 AND updated_at > $2
                 ORDER BY updated_at ASC
                 LIMIT $3",
                &[&self.user_id, &cutoff, &PULL_BATCH],
            )
            .await
            .context("pull projects query")?;

        if rows.is_empty() {
            return Ok(0);
        }

        let mut max_updated = cutoff;
        {
            let conn = self.brain.lock().unwrap();
            let tx = conn.unchecked_transaction()?;
            for row in &rows {
                let user_id: String = row.get("user_id");
                let name: String = row.get("name");
                let path: String = row.get("path");
                let metadata: serde_json::Value = row.get("metadata");
                let created: DateTime<Utc> = row.get("created_at");
                let updated: DateTime<Utc> = row.get("updated_at");

                let updated_s = updated.to_rfc3339();

                tx.execute(
                    r#"INSERT INTO projects
                        (user_id, name, path, metadata, created_at, updated_at, synced_at)
                       VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?6)
                       ON CONFLICT(user_id, name) DO UPDATE SET
                           path = excluded.path,
                           metadata = excluded.metadata,
                           updated_at = excluded.updated_at,
                           synced_at = excluded.synced_at
                       WHERE projects.updated_at < excluded.updated_at"#,
                    params![
                        user_id,
                        name,
                        path,
                        metadata.to_string(),
                        created.to_rfc3339(),
                        updated_s,
                    ],
                )?;

                if updated > max_updated {
                    max_updated = updated;
                }
            }
            sync_state::set_last_pulled(&tx, "projects", &max_updated.to_rfc3339())?;
            tx.commit()?;
        }
        Ok(rows.len())
    }

    pub async fn pull_all(&self) -> Result<PullSummary> {
        Ok(PullSummary {
            events: self.pull_events().await?,
            memories: self.pull_memories().await?,
            advice: self.pull_advice().await?,
            projects: self.pull_projects().await?,
        })
    }

    /// _sync_state 에서 last_pulled_at 읽어 cutoff 로 변환.
    /// 첫 실행 시 (값 없음) 1970년 epoch 사용.
    fn read_cutoff(&self, table: &str) -> Result<DateTime<Utc>> {
        let conn = self.brain.lock().unwrap();
        let last = sync_state::get_last_pulled(&conn, table)?;
        let s = last.unwrap_or_else(|| "1970-01-01T00:00:00Z".to_string());
        DateTime::parse_from_rfc3339(&s)
            .map(|d| d.with_timezone(&Utc))
            .with_context(|| format!("parse last_pulled_at '{}'", s))
    }
}

#[derive(Debug, Clone, serde::Serialize)]
pub struct PullSummary {
    pub events: usize,
    pub memories: usize,
    pub advice: usize,
    pub projects: usize,
}