systemprompt-sync 0.1.22

Sync services for systemprompt.io - file, database, and crate deployment synchronization
Documentation
use sqlx::PgPool;

use super::{ContextExport, SkillExport, UserExport};
use crate::error::SyncResult;

pub(super) async fn upsert_user(pool: &PgPool, user: &UserExport) -> SyncResult<(usize, usize)> {
    let conflict_exists: Option<bool> = sqlx::query_scalar!(
        "SELECT EXISTS(SELECT 1 FROM users WHERE (name = $1 OR email = $2) AND id != $3)",
        user.name,
        user.email,
        user.id
    )
    .fetch_one(pool)
    .await?;

    if conflict_exists == Some(true) {
        tracing::debug!(
            user_id = %user.id,
            name = %user.name,
            email = %user.email,
            "User with same name or email exists with different id, skipping"
        );
        return Ok((0, 0));
    }

    let result = sqlx::query!(
        r#"INSERT INTO users (id, name, email, full_name, display_name, status, email_verified,
                              roles, is_bot, is_scanner, avatar_url, created_at, updated_at)
           VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
           ON CONFLICT (id) DO UPDATE SET
             name = EXCLUDED.name,
             email = EXCLUDED.email,
             full_name = EXCLUDED.full_name,
             display_name = EXCLUDED.display_name,
             status = EXCLUDED.status,
             email_verified = EXCLUDED.email_verified,
             roles = EXCLUDED.roles,
             is_bot = EXCLUDED.is_bot,
             is_scanner = EXCLUDED.is_scanner,
             avatar_url = EXCLUDED.avatar_url,
             updated_at = EXCLUDED.updated_at"#,
        user.id,
        user.name,
        user.email,
        user.full_name,
        user.display_name,
        user.status,
        user.email_verified,
        &user.roles,
        user.is_bot,
        user.is_scanner,
        user.avatar_url,
        user.created_at,
        user.updated_at
    )
    .execute(pool)
    .await?;

    if result.rows_affected() > 0 && user.created_at == user.updated_at {
        Ok((1, 0))
    } else if result.rows_affected() > 0 {
        Ok((0, 1))
    } else {
        Ok((0, 0))
    }
}

pub(super) async fn upsert_skill(pool: &PgPool, skill: &SkillExport) -> SyncResult<(usize, usize)> {
    let result = sqlx::query!(
        r#"INSERT INTO agent_skills (skill_id, file_path, name, description, instructions,
                                     enabled, tags, category_id, source_id, created_at, updated_at)
           VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
           ON CONFLICT (skill_id) DO UPDATE SET
             file_path = EXCLUDED.file_path,
             name = EXCLUDED.name,
             description = EXCLUDED.description,
             instructions = EXCLUDED.instructions,
             enabled = EXCLUDED.enabled,
             tags = EXCLUDED.tags,
             category_id = EXCLUDED.category_id,
             source_id = EXCLUDED.source_id,
             updated_at = EXCLUDED.updated_at"#,
        skill.skill_id,
        skill.file_path,
        skill.name,
        skill.description,
        skill.instructions,
        skill.enabled,
        skill.tags.as_deref(),
        skill.category_id,
        skill.source_id,
        skill.created_at,
        skill.updated_at
    )
    .execute(pool)
    .await?;

    if result.rows_affected() > 0 && skill.created_at == skill.updated_at {
        Ok((1, 0))
    } else if result.rows_affected() > 0 {
        Ok((0, 1))
    } else {
        Ok((0, 0))
    }
}

pub(super) async fn upsert_context(
    pool: &PgPool,
    context: &ContextExport,
) -> SyncResult<(usize, usize)> {
    let user_exists: Option<bool> = sqlx::query_scalar!(
        "SELECT EXISTS(SELECT 1 FROM users WHERE id = $1)",
        context.user_id
    )
    .fetch_one(pool)
    .await?;

    if user_exists != Some(true) {
        tracing::debug!(
            user_id = %context.user_id,
            context_id = %context.context_id,
            "User not found in target database, skipping context"
        );
        return Ok((0, 0));
    }

    let session_id = match &context.session_id {
        Some(sid) => {
            let exists: Option<bool> = sqlx::query_scalar!(
                "SELECT EXISTS(SELECT 1 FROM user_sessions WHERE session_id = $1)",
                sid
            )
            .fetch_one(pool)
            .await?;

            if exists == Some(true) {
                Some(sid.clone())
            } else {
                tracing::debug!(
                    session_id = %sid,
                    context_id = %context.context_id,
                    "Session not found in target database, setting session_id to NULL"
                );
                None
            }
        },
        None => None,
    };

    let result = sqlx::query!(
        r#"INSERT INTO user_contexts (context_id, user_id, session_id, name, created_at, updated_at)
           VALUES ($1, $2, $3, $4, $5, $6)
           ON CONFLICT (context_id) DO UPDATE SET
             user_id = EXCLUDED.user_id,
             session_id = EXCLUDED.session_id,
             name = EXCLUDED.name,
             updated_at = EXCLUDED.updated_at"#,
        context.context_id,
        context.user_id,
        session_id,
        context.name,
        context.created_at,
        context.updated_at
    )
    .execute(pool)
    .await?;

    if result.rows_affected() > 0 && context.created_at == context.updated_at {
        Ok((1, 0))
    } else if result.rows_affected() > 0 {
        Ok((0, 1))
    } else {
        Ok((0, 0))
    }
}