use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use rusqlite::params;
use crate::db::sync_state;
const PULL_BATCH: i64 = 200;
impl super::Sync {
pub async fn pull_events(&self) -> Result<usize> {
let cutoff = self.read_cutoff("events")?;
let client = self.pg.get().await.context("get client")?;
let rows = client
.query(
"SELECT id, user_id, project, event_type, path, payload, created_at, updated_at
FROM asurada.events
WHERE user_id = $1 AND updated_at > $2
ORDER BY updated_at ASC
LIMIT $3",
&[&self.user_id, &cutoff, &PULL_BATCH],
)
.await
.context("pull events query")?;
if rows.is_empty() {
return Ok(0);
}
let mut max_updated = cutoff;
{
let conn = self.brain.lock().unwrap();
let tx = conn.unchecked_transaction()?;
for row in &rows {
let id: String = row.get("id");
let user_id: String = row.get("user_id");
let project: String = row.get("project");
let event_type: String = row.get("event_type");
let path: Option<String> = row.get("path");
let payload: serde_json::Value = row.get("payload");
let created: DateTime<Utc> = row.get("created_at");
let updated: DateTime<Utc> = row.get("updated_at");
let created_s = created.to_rfc3339();
let updated_s = updated.to_rfc3339();
tx.execute(
r#"INSERT INTO events
(id, user_id, project, event_type, path, payload,
created_at, updated_at, synced_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?8)
ON CONFLICT(id) DO UPDATE SET
event_type = excluded.event_type,
path = excluded.path,
payload = excluded.payload,
updated_at = excluded.updated_at,
synced_at = excluded.synced_at
WHERE events.updated_at < excluded.updated_at"#,
params![
id,
user_id,
project,
event_type,
path,
payload.to_string(),
created_s,
updated_s,
],
)?;
if updated > max_updated {
max_updated = updated;
}
}
sync_state::set_last_pulled(&tx, "events", &max_updated.to_rfc3339())?;
tx.commit()?;
}
Ok(rows.len())
}
pub async fn pull_memories(&self) -> Result<usize> {
let cutoff = self.read_cutoff("memories")?;
let client = self.pg.get().await?;
let rows = client
.query(
"SELECT id, user_id, text, scope, priority, source, project, tech, metadata,
status, created_at, updated_at, deleted_at
FROM asurada.memories
WHERE user_id = $1 AND updated_at > $2
ORDER BY updated_at ASC
LIMIT $3",
&[&self.user_id, &cutoff, &PULL_BATCH],
)
.await
.context("pull memories query")?;
if rows.is_empty() {
return Ok(0);
}
let mut max_updated = cutoff;
{
let conn = self.brain.lock().unwrap();
let tx = conn.unchecked_transaction()?;
for row in &rows {
let id: String = row.get("id");
let user_id: String = row.get("user_id");
let text: String = row.get("text");
let scope: String = row.get("scope");
let priority: String = row.get("priority");
let source: String = row.get("source");
let project: Option<String> = row.get("project");
let tech: Option<String> = row.get("tech");
let metadata: serde_json::Value = row.get("metadata");
let status: String = row.get("status");
let created: DateTime<Utc> = row.get("created_at");
let updated: DateTime<Utc> = row.get("updated_at");
let deleted: Option<DateTime<Utc>> = row.get("deleted_at");
let updated_s = updated.to_rfc3339();
tx.execute(
r#"INSERT INTO memories
(id, user_id, text, scope, priority, source, project, tech, metadata,
status, created_at, updated_at, deleted_at, synced_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?12)
ON CONFLICT(id) DO UPDATE SET
text = excluded.text,
scope = excluded.scope,
priority = excluded.priority,
source = excluded.source,
project = excluded.project,
tech = excluded.tech,
metadata = excluded.metadata,
status = excluded.status,
updated_at = excluded.updated_at,
deleted_at = excluded.deleted_at,
synced_at = excluded.synced_at
WHERE memories.updated_at < excluded.updated_at"#,
params![
id,
user_id,
text,
scope,
priority,
source,
project,
tech,
metadata.to_string(),
status,
created.to_rfc3339(),
updated_s,
deleted.map(|d| d.to_rfc3339()),
],
)?;
if updated > max_updated {
max_updated = updated;
}
}
sync_state::set_last_pulled(&tx, "memories", &max_updated.to_rfc3339())?;
tx.commit()?;
}
Ok(rows.len())
}
pub async fn pull_advice(&self) -> Result<usize> {
let cutoff = self.read_cutoff("advice")?;
let client = self.pg.get().await?;
let rows = client
.query(
"SELECT id, user_id, project, text, severity, paths, verifiable, state,
confirmed_at, confirmed_by, metadata, created_at, updated_at
FROM asurada.advice
WHERE user_id = $1 AND updated_at > $2
ORDER BY updated_at ASC
LIMIT $3",
&[&self.user_id, &cutoff, &PULL_BATCH],
)
.await
.context("pull advice query")?;
if rows.is_empty() {
return Ok(0);
}
let mut max_updated = cutoff;
{
let conn = self.brain.lock().unwrap();
let tx = conn.unchecked_transaction()?;
for row in &rows {
let id: String = row.get("id");
let user_id: String = row.get("user_id");
let project: String = row.get("project");
let text: String = row.get("text");
let severity: String = row.get("severity");
let paths: serde_json::Value = row.get("paths");
let verifiable: bool = row.get("verifiable");
let state: String = row.get("state");
let confirmed_at: Option<DateTime<Utc>> = row.get("confirmed_at");
let confirmed_by: Option<String> = row.get("confirmed_by");
let metadata: serde_json::Value = row.get("metadata");
let created: DateTime<Utc> = row.get("created_at");
let updated: DateTime<Utc> = row.get("updated_at");
let updated_s = updated.to_rfc3339();
tx.execute(
r#"INSERT INTO advice
(id, user_id, project, text, severity, paths, verifiable, state,
confirmed_at, confirmed_by, metadata, created_at, updated_at, synced_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?13)
ON CONFLICT(id) DO UPDATE SET
text = excluded.text,
severity = excluded.severity,
paths = excluded.paths,
verifiable = excluded.verifiable,
state = excluded.state,
confirmed_at = excluded.confirmed_at,
confirmed_by = excluded.confirmed_by,
metadata = excluded.metadata,
updated_at = excluded.updated_at,
synced_at = excluded.synced_at
WHERE advice.updated_at < excluded.updated_at"#,
params![
id,
user_id,
project,
text,
severity,
paths.to_string(),
verifiable as i32,
state,
confirmed_at.map(|d| d.to_rfc3339()),
confirmed_by,
metadata.to_string(),
created.to_rfc3339(),
updated_s,
],
)?;
if updated > max_updated {
max_updated = updated;
}
}
sync_state::set_last_pulled(&tx, "advice", &max_updated.to_rfc3339())?;
tx.commit()?;
}
Ok(rows.len())
}
pub async fn pull_projects(&self) -> Result<usize> {
let cutoff = self.read_cutoff("projects")?;
let client = self.pg.get().await?;
let rows = client
.query(
"SELECT user_id, name, path, metadata, created_at, updated_at
FROM asurada.projects
WHERE user_id = $1 AND updated_at > $2
ORDER BY updated_at ASC
LIMIT $3",
&[&self.user_id, &cutoff, &PULL_BATCH],
)
.await
.context("pull projects query")?;
if rows.is_empty() {
return Ok(0);
}
let mut max_updated = cutoff;
{
let conn = self.brain.lock().unwrap();
let tx = conn.unchecked_transaction()?;
for row in &rows {
let user_id: String = row.get("user_id");
let name: String = row.get("name");
let path: String = row.get("path");
let metadata: serde_json::Value = row.get("metadata");
let created: DateTime<Utc> = row.get("created_at");
let updated: DateTime<Utc> = row.get("updated_at");
let updated_s = updated.to_rfc3339();
tx.execute(
r#"INSERT INTO projects
(user_id, name, path, metadata, created_at, updated_at, synced_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?6)
ON CONFLICT(user_id, name) DO UPDATE SET
path = excluded.path,
metadata = excluded.metadata,
updated_at = excluded.updated_at,
synced_at = excluded.synced_at
WHERE projects.updated_at < excluded.updated_at"#,
params![
user_id,
name,
path,
metadata.to_string(),
created.to_rfc3339(),
updated_s,
],
)?;
if updated > max_updated {
max_updated = updated;
}
}
sync_state::set_last_pulled(&tx, "projects", &max_updated.to_rfc3339())?;
tx.commit()?;
}
Ok(rows.len())
}
pub async fn pull_intents(&self) -> Result<usize> {
let cutoff = self.read_cutoff("intents")?;
let client = self.pg.get().await?;
let rows = client
.query(
"SELECT id, user_id, project, strength, intent_text, source,
source_signal_ids, status, metadata, created_at, updated_at
FROM asurada.intents
WHERE user_id = $1 AND updated_at > $2
ORDER BY updated_at ASC
LIMIT $3",
&[&self.user_id, &cutoff, &PULL_BATCH],
)
.await
.context("pull intents query")?;
if rows.is_empty() {
return Ok(0);
}
let mut max_updated = cutoff;
{
let conn = self.brain.lock().unwrap();
let tx = conn.unchecked_transaction()?;
for row in &rows {
let id: String = row.get("id");
let user_id: String = row.get("user_id");
let project: Option<String> = row.get("project");
let strength: String = row.get("strength");
let intent_text: String = row.get("intent_text");
let source: String = row.get("source");
let source_signal_ids: serde_json::Value = row.get("source_signal_ids");
let status: String = row.get("status");
let metadata: serde_json::Value = row.get("metadata");
let created: DateTime<Utc> = row.get("created_at");
let updated: DateTime<Utc> = row.get("updated_at");
let updated_s = updated.to_rfc3339();
tx.execute(
r#"INSERT INTO intents
(id, user_id, project, strength, intent_text, source,
source_signal_ids, status, metadata,
created_at, updated_at, synced_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?11)
ON CONFLICT(id) DO UPDATE SET
project = excluded.project,
strength = excluded.strength,
intent_text = excluded.intent_text,
source = excluded.source,
source_signal_ids = excluded.source_signal_ids,
status = excluded.status,
metadata = excluded.metadata,
updated_at = excluded.updated_at,
synced_at = excluded.synced_at
WHERE intents.updated_at < excluded.updated_at"#,
params![
id,
user_id,
project,
strength,
intent_text,
source,
source_signal_ids.to_string(),
status,
metadata.to_string(),
created.to_rfc3339(),
updated_s,
],
)?;
if updated > max_updated {
max_updated = updated;
}
}
sync_state::set_last_pulled(&tx, "intents", &max_updated.to_rfc3339())?;
tx.commit()?;
}
Ok(rows.len())
}
pub async fn pull_harnesses(&self) -> Result<usize> {
let cutoff = self.read_cutoff("harnesses")?;
let client = self.pg.get().await?;
let rows = client
.query(
"SELECT id, user_id, project, slug, title, description, reason,
file_paths, usage_count, last_used_at, evolution_log,
source_signal_ids, source_cluster_signature, status, metadata,
created_at, updated_at
FROM asurada.harnesses
WHERE user_id = $1 AND updated_at > $2
ORDER BY updated_at ASC
LIMIT $3",
&[&self.user_id, &cutoff, &PULL_BATCH],
)
.await
.context("pull harnesses query")?;
if rows.is_empty() {
return Ok(0);
}
let mut max_updated = cutoff;
{
let conn = self.brain.lock().unwrap();
let tx = conn.unchecked_transaction()?;
for row in &rows {
let id: String = row.get("id");
let user_id: String = row.get("user_id");
let project: String = row.get("project");
let slug: String = row.get("slug");
let title: String = row.get("title");
let description: String = row.get("description");
let reason: String = row.get("reason");
let file_paths: serde_json::Value = row.get("file_paths");
let usage_count: i64 = row.get("usage_count");
let last_used_at: Option<DateTime<Utc>> = row.get("last_used_at");
let evolution_log: serde_json::Value = row.get("evolution_log");
let source_signal_ids: serde_json::Value = row.get("source_signal_ids");
let source_cluster_signature: Option<i64> = row.get("source_cluster_signature");
let status: String = row.get("status");
let metadata: serde_json::Value = row.get("metadata");
let created: DateTime<Utc> = row.get("created_at");
let updated: DateTime<Utc> = row.get("updated_at");
let updated_s = updated.to_rfc3339();
tx.execute(
r#"INSERT INTO harnesses
(id, user_id, project, slug, title, description, reason,
file_paths, usage_count, last_used_at, evolution_log,
source_signal_ids, source_cluster_signature, status, metadata,
created_at, updated_at, synced_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11,
?12, ?13, ?14, ?15, ?16, ?17, ?17)
ON CONFLICT(id) DO UPDATE SET
project = excluded.project,
slug = excluded.slug,
title = excluded.title,
description = excluded.description,
reason = excluded.reason,
file_paths = excluded.file_paths,
usage_count = excluded.usage_count,
last_used_at = excluded.last_used_at,
evolution_log = excluded.evolution_log,
source_signal_ids = excluded.source_signal_ids,
source_cluster_signature = excluded.source_cluster_signature,
status = excluded.status,
metadata = excluded.metadata,
updated_at = excluded.updated_at,
synced_at = excluded.synced_at
WHERE harnesses.updated_at < excluded.updated_at"#,
params![
id,
user_id,
project,
slug,
title,
description,
reason,
file_paths.to_string(),
usage_count,
last_used_at.map(|d| d.to_rfc3339()),
evolution_log.to_string(),
source_signal_ids.to_string(),
source_cluster_signature,
status,
metadata.to_string(),
created.to_rfc3339(),
updated_s,
],
)?;
if updated > max_updated {
max_updated = updated;
}
}
sync_state::set_last_pulled(&tx, "harnesses", &max_updated.to_rfc3339())?;
tx.commit()?;
}
Ok(rows.len())
}
pub async fn pull_issues(&self) -> Result<usize> {
let cutoff = self.read_cutoff("issues")?;
let client = self.pg.get().await?;
let rows = client
.query(
"SELECT id, user_id, title, summary, projects, status,
started_at, ended_at, event_count, metadata,
created_at, updated_at
FROM asurada.issues
WHERE user_id = $1 AND updated_at > $2
ORDER BY updated_at ASC
LIMIT $3",
&[&self.user_id, &cutoff, &PULL_BATCH],
)
.await
.context("pull issues query")?;
if rows.is_empty() {
return Ok(0);
}
let mut max_updated = cutoff;
{
let conn = self.brain.lock().unwrap();
let tx = conn.unchecked_transaction()?;
for row in &rows {
let id: String = row.get("id");
let user_id: String = row.get("user_id");
let title: String = row.get("title");
let summary: String = row.get("summary");
let projects: serde_json::Value = row.get("projects");
let status: String = row.get("status");
let started: DateTime<Utc> = row.get("started_at");
let ended: Option<DateTime<Utc>> = row.get("ended_at");
let event_count: i64 = row.get("event_count");
let metadata: serde_json::Value = row.get("metadata");
let created: DateTime<Utc> = row.get("created_at");
let updated: DateTime<Utc> = row.get("updated_at");
let updated_s = updated.to_rfc3339();
tx.execute(
r#"INSERT INTO issues
(id, user_id, title, summary, projects, status,
started_at, ended_at, event_count, metadata,
created_at, updated_at, synced_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?12)
ON CONFLICT(id) DO UPDATE SET
title = excluded.title,
summary = excluded.summary,
projects = excluded.projects,
status = excluded.status,
started_at = excluded.started_at,
ended_at = excluded.ended_at,
event_count = excluded.event_count,
metadata = excluded.metadata,
updated_at = excluded.updated_at,
synced_at = excluded.synced_at
WHERE issues.updated_at < excluded.updated_at"#,
params![
id,
user_id,
title,
summary,
projects.to_string(),
status,
started.to_rfc3339(),
ended.map(|d| d.to_rfc3339()),
event_count,
metadata.to_string(),
created.to_rfc3339(),
updated_s,
],
)?;
if updated > max_updated {
max_updated = updated;
}
}
sync_state::set_last_pulled(&tx, "issues", &max_updated.to_rfc3339())?;
tx.commit()?;
}
Ok(rows.len())
}
pub async fn pull_all(&self) -> Result<PullSummary> {
Ok(PullSummary {
events: self.pull_events().await?,
memories: self.pull_memories().await?,
advice: self.pull_advice().await?,
projects: self.pull_projects().await?,
intents: self.pull_intents().await?,
harnesses: self.pull_harnesses().await?,
issues: self.pull_issues().await?,
})
}
fn read_cutoff(&self, table: &str) -> Result<DateTime<Utc>> {
let conn = self.brain.lock().unwrap();
let last = sync_state::get_last_pulled(&conn, table)?;
let s = last.unwrap_or_else(|| "1970-01-01T00:00:00Z".to_string());
DateTime::parse_from_rfc3339(&s)
.map(|d| d.with_timezone(&Utc))
.with_context(|| format!("parse last_pulled_at '{}'", s))
}
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct PullSummary {
pub events: usize,
pub memories: usize,
pub advice: usize,
pub projects: usize,
pub intents: usize,
pub harnesses: usize,
pub issues: usize,
}