asurada 0.2.2

Asurada — a memory + cognition daemon that grows with the user. Local-first, BYOK, shared by Devist/Webchemist Core/etc.
// brain.db ↔ Supabase Postgres 양방향 sync.

pub mod pull;
pub mod push;

use std::time::Duration;

use anyhow::{Context, Result};
use deadpool_postgres::{Config, Manager, ManagerConfig, Pool, RecyclingMethod};
use rusqlite::Connection;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use tokio_postgres::{Config as PgConfig, NoTls};

use crate::credentials::Credentials;

#[derive(Clone)]
pub struct Sync {
    pub brain: Arc<Mutex<Connection>>,
    pub pg: Pool,
    pub user_id: String,
}

impl Sync {
    pub async fn connect(brain: Arc<Mutex<Connection>>, creds: &Credentials) -> Result<Self> {
        // tokio-postgres Config 파싱.
        let pg_cfg =
            PgConfig::from_str(&creds.database_url).context("parse ASURADA_DATABASE_URL")?;

        let mgr_cfg = ManagerConfig {
            recycling_method: RecyclingMethod::Fast,
        };
        let mgr = Manager::from_config(pg_cfg, NoTls, mgr_cfg);
        let pool = Pool::builder(mgr)
            .max_size(3)
            .build()
            .context("build pg pool")?;

        // 헬스체크: 스키마가 적용됐는지 확인.
        let client = pool.get().await.context("get pg client")?;
        let row = client
            .query_one(
                "SELECT EXISTS (
                   SELECT 1 FROM information_schema.tables
                   WHERE table_schema = 'asurada' AND table_name = 'events'
                 )",
                &[],
            )
            .await
            .context("check asurada schema")?;
        let exists: bool = row.get(0);
        if !exists {
            anyhow::bail!(
                "asurada 스키마가 Postgres 에 없습니다.\n\
                 다음을 한 번 실행해야 합니다:\n\
                 \tpsql \"$ASURADA_DATABASE_URL\" -f migrations/supabase/0001_init.sql"
            );
        }

        // unused 보다 명시: Config struct 가 deadpool 에서 활용되는데 import 필요.
        let _ = Config::new();

        Ok(Self {
            brain,
            pg: pool,
            user_id: creds.user_id.clone(),
        })
    }

    /// 백그라운드 sync 루프. `serve` 가 spawn 해서 데몬 라이프타임 동안 돌린다.
    /// 매 interval 마다 push 후 pull 한 번씩.
    /// 일시적 에러는 로그만 남기고 다음 라운드 진행 (네트워크 끊김 등).
    pub async fn run_loop(self, interval: Duration) {
        tracing::info!("sync loop started (interval={}s)", interval.as_secs());
        let mut ticker = tokio::time::interval(interval);
        ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
        // 첫 번째 tick 은 즉시 발생하므로 한 번 소비.
        ticker.tick().await;

        loop {
            ticker.tick().await;
            // Heartbeat — 가장 가벼운 호출. push/pull 실패와 무관하게 alive 신호 먼저.
            if let Err(e) = self.heartbeat().await {
                tracing::warn!("[sync] heartbeat err: {}", e);
            }
            match self.push_all().await {
                Ok(p)
                    if p.events
                        + p.memories
                        + p.advice
                        + p.projects
                        + p.intents
                        + p.harnesses
                        + p.issues
                        > 0 =>
                {
                    tracing::info!(
                        "[sync] push events={} memories={} advice={} projects={} intents={} harnesses={} issues={}",
                        p.events, p.memories, p.advice, p.projects,
                        p.intents, p.harnesses, p.issues
                    );
                }
                Ok(_) => { /* nothing changed */ }
                Err(e) => tracing::warn!("[sync] push err: {}", e),
            }
            match self.pull_all().await {
                Ok(p)
                    if p.events
                        + p.memories
                        + p.advice
                        + p.projects
                        + p.intents
                        + p.harnesses
                        + p.issues
                        > 0 =>
                {
                    tracing::info!(
                        "[sync] pull events={} memories={} advice={} projects={} intents={} harnesses={} issues={}",
                        p.events, p.memories, p.advice, p.projects,
                        p.intents, p.harnesses, p.issues
                    );
                }
                Ok(_) => {}
                Err(e) => tracing::warn!("[sync] pull err: {}", e),
            }
        }
    }

    /// `asurada.heartbeat` 테이블에 alive 신호 UPSERT — 대시보드가 이걸 읽음.
    async fn heartbeat(&self) -> anyhow::Result<()> {
        let client = self.pg.get().await?;
        client
            .execute(
                r#"INSERT INTO asurada.heartbeat (user_id, last_seen, version)
                   VALUES ($1, NOW(), $2)
                   ON CONFLICT (user_id) DO UPDATE SET
                       last_seen = EXCLUDED.last_seen,
                       version = EXCLUDED.version"#,
                &[&self.user_id, &env!("CARGO_PKG_VERSION")],
            )
            .await?;
        Ok(())
    }
}