kopuz-server 0.7.0

A modern, lightweight music player built with Rust and Dioxus.
//! Favorites reconciler (issue #347, step 9): push-before-pull sync between
//! the DB's per-server favorites and the remote, dispatched through the
//! [`MediaSource`](crate::source::MediaSource) backend so it's service-agnostic.
//!
//! Push first so a just-toggled like isn't reverted by the pull; pending rows
//! that fail to push stay pending and are retried next cycle. The pull replaces
//! the clean set (dirty rows survive — see `replace_favorites_clean`).

use crate::source::{AuthOutcome, MediaSource, SourceError};

/// Minimum age of the last remote pull before a non-Manual reconcile pulls
/// again. Pushes are never gated — only the expensive full fetch is.
const PULL_MIN_SECS: u64 = 30 * 60;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SyncReason {
    Activate,
    Interval,
    AfterMutation,
    Manual,
}

#[derive(Debug, Default, Clone)]
pub struct SyncReport {
    pub pushed_likes: usize,
    pub pushed_unlikes: usize,
    pub failed_pushes: usize,
    pub pulled: usize,
    /// Whether a pull APPLIED this cycle — `pulled` is the remote set's size,
    /// so a pull that only removed rows reports `pulled == 0` and the caller
    /// still needs to refresh the UI.
    pub did_pull: bool,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SyncError {
    /// Real auth rejection — the caller should mark the server expired and
    /// surface re-auth UI. Distinct from a network blip.
    Expired,
    /// Transient (network/server) failure — back off and retry later.
    Unreachable(String),
}

/// One reconcile cycle for the source's server against its remote.
#[tracing::instrument(name = "favorites.reconcile", skip(source), fields(server = %source.source().as_str(), ?reason))]
pub async fn reconcile_favorites(
    source: &dyn MediaSource,
    reason: SyncReason,
) -> Result<SyncReport, SyncError> {
    let db = source.db();
    let server_id = source.source().as_str();
    // Decide what there is to do BEFORE any network call — a reconcile with no
    // pending pushes and a fresh pull must be a complete no-op (not even the
    // validate request). The DB is the only thing consulted on the quiet path.
    let likes = db
        .dirty_favorites(server_id)
        .await
        .map_err(|e| SyncError::Unreachable(e.to_string()))?;
    let unlikes = db
        .dirty_unlikes(server_id)
        .await
        .map_err(|e| SyncError::Unreachable(e.to_string()))?;
    let now = std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)
        .map(|d| d.as_secs())
        .unwrap_or(0);
    let last_pull: u64 = db
        .meta_get("fav_pull", server_id)
        .await
        .ok()
        .flatten()
        .and_then(|s| s.parse().ok())
        .unwrap_or(0);
    // A stamp in the future (backward clock step) counts as stale, otherwise
    // pulls would be suppressed until real time catches up to it.
    let should_pull =
        matches!(reason, SyncReason::Manual) || last_pull > now || now - last_pull >= PULL_MIN_SECS;
    if likes.is_empty() && unlikes.is_empty() && !should_pull {
        return Ok(SyncReport::default());
    }

    match source.validate().await {
        AuthOutcome::Valid => {}
        AuthOutcome::Expired => return Err(SyncError::Expired),
        AuthOutcome::Unreachable => {
            return Err(SyncError::Unreachable("server unreachable".into()));
        }
    }

    let mut report = SyncReport::default();

    // Push pending likes, then pending unlikes (each resolved on success only,
    // so a failure is retried next cycle). The pushed refs are remembered: a
    // pull in the SAME cycle can see a stale remote listing (YT's liked
    // browse is eventually consistent), and a just-pushed like — now clean —
    // would otherwise be deleted by the pull, or a just-pushed unlike
    // resurrected.
    let mut pushed_like_refs: Vec<String> = Vec::new();
    let mut pushed_unlike_refs: std::collections::HashSet<String> =
        std::collections::HashSet::new();
    for r in likes {
        match source.push_favorite(&r, true).await {
            Ok(()) => {
                let _ = db.clear_favorite_dirty(server_id, &r).await;
                report.pushed_likes += 1;
                pushed_like_refs.push(r);
            }
            Err(e) => {
                tracing::warn!(error = %e, item = %r, "favorite like push failed");
                report.failed_pushes += 1;
            }
        }
    }
    for r in unlikes {
        match source.push_favorite(&r, false).await {
            Ok(()) => {
                let _ = db.clear_favorite_dirty(server_id, &r).await;
                report.pushed_unlikes += 1;
                pushed_unlike_refs.insert(r);
            }
            Err(e) => {
                tracing::warn!(error = %e, item = %r, "favorite unlike push failed");
                report.failed_pushes += 1;
            }
        }
    }

    // Pull: the remote set becomes the clean baseline; still-pending local rows
    // survive. fetch_favorites is EXPENSIVE for YT (a full liked-library browse
    // stream), so the pull is staleness-gated (computed up top): Manual always
    // pulls; everything else only when the last pull is old.
    if should_pull {
        let mut remote = source.fetch_favorites().await.map_err(|e| match e {
            SourceError::Auth => SyncError::Expired,
            other => SyncError::Unreachable(other.to_string()),
        })?;
        report.pulled = remote.len();
        // Overlay this cycle's pushes on the (possibly stale) remote listing.
        for r in pushed_like_refs {
            if !remote.contains(&r) {
                remote.push(r);
            }
        }
        remote.retain(|r| !pushed_unlike_refs.contains(r));
        db.replace_favorites_clean(server_id, &remote)
            .await
            .map_err(|e| SyncError::Unreachable(e.to_string()))?;
        let _ = db.meta_put("fav_pull", server_id, &now.to_string()).await;
        report.did_pull = true;
    }

    tracing::info!(
        pushed_likes = report.pushed_likes,
        pushed_unlikes = report.pushed_unlikes,
        failed = report.failed_pushes,
        pulled = report.pulled,
        "favorites reconciled"
    );
    Ok(report)
}