pub mod pull;
pub mod push;
use std::time::Duration;
use anyhow::{Context, Result};
use deadpool_postgres::{Config, Manager, ManagerConfig, Pool, RecyclingMethod};
use rusqlite::Connection;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use tokio_postgres::{Config as PgConfig, NoTls};
use crate::credentials::Credentials;
#[derive(Clone)]
pub struct Sync {
pub brain: Arc<Mutex<Connection>>,
pub pg: Pool,
pub user_id: String,
}
impl Sync {
pub async fn connect(brain: Arc<Mutex<Connection>>, creds: &Credentials) -> Result<Self> {
let pg_cfg = PgConfig::from_str(&creds.database_url)
.context("parse ASURADA_DATABASE_URL")?;
let mgr_cfg = ManagerConfig {
recycling_method: RecyclingMethod::Fast,
};
let mgr = Manager::from_config(pg_cfg, NoTls, mgr_cfg);
let pool = Pool::builder(mgr)
.max_size(3)
.build()
.context("build pg pool")?;
let client = pool.get().await.context("get pg client")?;
let row = client
.query_one(
"SELECT EXISTS (
SELECT 1 FROM information_schema.tables
WHERE table_schema = 'asurada' AND table_name = 'events'
)",
&[],
)
.await
.context("check asurada schema")?;
let exists: bool = row.get(0);
if !exists {
anyhow::bail!(
"asurada 스키마가 Postgres 에 없습니다.\n\
다음을 한 번 실행해야 합니다:\n\
\tpsql \"$ASURADA_DATABASE_URL\" -f migrations/supabase/0001_init.sql"
);
}
let _ = Config::new();
Ok(Self {
brain,
pg: pool,
user_id: creds.user_id.clone(),
})
}
pub async fn run_loop(self, interval: Duration) {
tracing::info!("sync loop started (interval={}s)", interval.as_secs());
let mut ticker = tokio::time::interval(interval);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
ticker.tick().await;
loop {
ticker.tick().await;
match self.push_all().await {
Ok(p) if p.events + p.memories + p.advice + p.projects > 0 => {
tracing::info!(
"[sync] push events={} memories={} advice={} projects={}",
p.events, p.memories, p.advice, p.projects
);
}
Ok(_) => { }
Err(e) => tracing::warn!("[sync] push err: {}", e),
}
match self.pull_all().await {
Ok(p) if p.events + p.memories + p.advice + p.projects > 0 => {
tracing::info!(
"[sync] pull events={} memories={} advice={} projects={}",
p.events, p.memories, p.advice, p.projects
);
}
Ok(_) => {}
Err(e) => tracing::warn!("[sync] pull err: {}", e),
}
}
}
}