use crate::runtime::CommandContext;
use crate::style;
use crate::support::util::{exit_code, exit_err};
const MAX_OBSERVATION_SYNC_FLUSHES: usize = 10;
const MAX_CLOUD_OUTBOX_SYNC_ITEMS: usize = 512;
const DEFAULT_OUTBOX_SYNC_PRIORITY_KINDS: &[(&str, Option<usize>)] = &[];
const OBSERVATION_OUTBOX_SYNC_PRIORITY_KINDS: &[(&str, Option<usize>)] =
&[(difflore_core::cloud::outbox::kind::OBSERVATION, None)];
const CANDIDATE_OUTBOX_SYNC_PRIORITY_KINDS: &[(&str, Option<usize>)] = &[(
difflore_core::cloud::outbox::kind::SESSION_MINED_CANDIDATE,
None,
)];
const OBSERVATION_OUTBOX_KINDS: &[&str] = &[difflore_core::cloud::outbox::kind::OBSERVATION];
const CANDIDATE_OUTBOX_KINDS: &[&str] =
&[difflore_core::cloud::outbox::kind::SESSION_MINED_CANDIDATE];
const TELEMETRY_OUTBOX_KINDS: &[&str] = &[
difflore_core::cloud::outbox::kind::ACCEPTED_EDIT,
difflore_core::cloud::outbox::kind::IMPORTED_REVIEWS,
difflore_core::cloud::outbox::kind::REVIEW_METRICS,
difflore_core::cloud::outbox::kind::TRAJECTORY,
difflore_core::cloud::outbox::kind::MCP_QUERY,
];
type AcceptedEditAttributionSummary = difflore_core::cloud::outbox::AcceptedEditAttributionSummary;
pub(crate) struct SyncArgs {
pub pull: bool,
pub push: bool,
pub dry_run: bool,
pub include_observations: bool,
pub include_candidates: bool,
pub include_telemetry: bool,
pub json: bool,
}
impl From<crate::cli::SyncCliArgs> for SyncArgs {
fn from(args: crate::cli::SyncCliArgs) -> Self {
Self {
pull: args.pull,
push: args.push,
dry_run: args.dry_run,
include_observations: args.include_observations,
include_candidates: args.include_candidates,
include_telemetry: args.include_telemetry,
json: args.json,
}
}
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
struct RawUploadPolicy {
include_observations: bool,
include_candidates: bool,
include_telemetry: bool,
}
impl RawUploadPolicy {
const fn new(
include_observations: bool,
include_candidates: bool,
include_telemetry: bool,
) -> Self {
Self {
include_observations,
include_candidates,
include_telemetry,
}
}
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
struct RawUploadSkipCounts {
observations: usize,
memory_candidates: usize,
telemetry: usize,
}
impl RawUploadSkipCounts {
const fn total(self) -> usize {
self.observations + self.memory_candidates + self.telemetry
}
}
#[derive(Clone, Copy, Debug)]
pub(crate) enum SyncDirection {
Pull,
Push,
Both,
}
impl SyncDirection {
const fn from_flags(pull: bool, push: bool) -> Self {
match (pull, push) {
(true, false) => Self::Pull,
(false, true) => Self::Push,
_ => Self::Both,
}
}
const fn do_pull(self) -> bool {
matches!(self, Self::Pull | Self::Both)
}
const fn do_push(self) -> bool {
matches!(self, Self::Push | Self::Both)
}
}
struct SyncOutcome {
created: usize,
updated: usize,
deleted: usize,
team_count: i32,
team_synced: usize,
settings_pull_applied: usize,
providers_added: usize,
observations_attempted: usize,
observations_uploaded: usize,
observations_queued: usize,
observations_skipped: usize,
memory_candidates_attempted: usize,
memory_candidates_uploaded: usize,
memory_candidates_queued: usize,
memory_candidates_skipped: usize,
telemetry_attempted: usize,
telemetry_uploaded: usize,
telemetry_queued: usize,
telemetry_skipped: usize,
accepted_edit_attribution: AcceptedEditAttributionSummary,
}
#[derive(Default)]
struct CloudOutboxPhaseOutcome {
observations_attempted: usize,
observations_uploaded: usize,
observations_queued: usize,
observations_skipped: usize,
memory_candidates_attempted: usize,
memory_candidates_uploaded: usize,
memory_candidates_queued: usize,
memory_candidates_skipped: usize,
telemetry_attempted: usize,
telemetry_uploaded: usize,
telemetry_queued: usize,
telemetry_skipped: usize,
accepted_edit_attribution: AcceptedEditAttributionSummary,
}
pub(crate) async fn handle_sync(ctx: &CommandContext, args: SyncArgs) {
let SyncArgs {
pull,
push,
dry_run,
include_observations,
include_candidates,
include_telemetry,
json,
} = args;
let direction = SyncDirection::from_flags(pull, push);
let raw_upload_policy =
RawUploadPolicy::new(include_observations, include_candidates, include_telemetry);
let client = ctx.cloud().await;
if !client.is_logged_in() {
emit_not_logged_in(json);
}
let db = &ctx.db;
if dry_run {
emit_dry_run(db, json, direction, raw_upload_policy).await;
return;
}
let pending_candidates_before_sync = difflore_core::skills::count_pending_candidates(db, None)
.await
.unwrap_or(0);
let mut spinner = sync_spinner(json, "Uploading local memory activity");
let (
observation_events_attempted,
observation_events_uploaded,
observation_events_queued,
observation_events_skipped,
) = if raw_upload_policy.include_observations {
let (attempted, uploaded, queued) = run_observations_phase(client).await;
(attempted, uploaded, queued, 0)
} else {
let queued = pending_observation_events_count().await;
(0, 0, queued, queued)
};
sync_spinner_tick(spinner.as_ref());
sync_spinner_set_message(&mut spinner, "Uploading local activity");
let cloud_outbox = run_cloud_outbox_phase(db, client, raw_upload_policy).await;
sync_spinner_tick(spinner.as_ref());
let local_skills = match difflore_core::skills::list(db).await {
Ok(skills) => skills,
Err(e) => exit_err(&format!("Failed to list local rules: {e}")),
};
let excluded_ids = prepare_excluded_ids(db, &local_skills).await;
sync_spinner_set_message(&mut spinner, "Syncing rules from cloud");
let synced_local = if direction.do_pull() {
match difflore_core::cloud::sync::sync_skills_filtered(client, &local_skills, &excluded_ids)
.await
{
Ok(Some(result)) => result,
Ok(None) => difflore_core::cloud::sync::SyncResult {
created: vec![],
updated: vec![],
deleted: vec![],
},
Err(e) => {
sync_spinner_finish_err(spinner.take(), "Cloud sync failed");
exit_err(&format_cloud_err(
"Failed to sync local rules",
&e.to_string(),
));
}
}
} else {
difflore_core::cloud::sync::SyncResult {
created: vec![],
updated: vec![],
deleted: vec![],
}
};
sync_spinner_tick(spinner.as_ref());
let apply_pull = direction.do_pull();
if apply_pull && let Err(e) = difflore_core::skills::apply_sync_result(db, &synced_local).await
{
eprintln!(
"{} Failed to apply sync changes: {e}",
style::amber(style::sym::WARN),
);
}
sync_spinner_set_message(&mut spinner, "Syncing team rules");
let (team_count, team_synced) = if direction.do_pull() {
match difflore_core::cloud::sync::sync_team_skills(client).await {
Ok(team) => {
if let Err(e) = difflore_core::skills::apply_sync_result(db, &team.synced).await {
eprintln!(
"{} Failed to apply team rule sync changes: {e}",
style::amber(style::sym::WARN),
);
}
(team.visible_count, team.synced.created_count())
}
Err(e) => {
sync_spinner_finish_err(spinner.take(), "Cloud sync failed");
exit_err(&format_cloud_err(
"Failed to sync team rules",
&e.to_string(),
));
}
}
} else {
(0, 0)
};
sync_spinner_tick(spinner.as_ref());
sync_spinner_set_message(&mut spinner, "Syncing settings");
let settings_pull_applied = run_settings_phase(client, direction).await;
sync_spinner_tick(spinner.as_ref());
sync_spinner_set_message(&mut spinner, "Syncing providers");
let providers_added = run_providers_phase(client, db, direction).await;
sync_spinner_tick(spinner.as_ref());
sync_spinner_finish_ok(spinner.take(), "Cloud sync completed.");
let outcome = SyncOutcome {
created: synced_local.created_count(),
updated: synced_local.updated_count(),
deleted: synced_local.deleted_count(),
team_count,
team_synced,
settings_pull_applied,
providers_added,
observations_attempted: observation_events_attempted + cloud_outbox.observations_attempted,
observations_uploaded: observation_events_uploaded + cloud_outbox.observations_uploaded,
observations_queued: observation_events_queued + cloud_outbox.observations_queued,
observations_skipped: observation_events_skipped + cloud_outbox.observations_skipped,
memory_candidates_attempted: cloud_outbox.memory_candidates_attempted,
memory_candidates_uploaded: cloud_outbox.memory_candidates_uploaded,
memory_candidates_queued: cloud_outbox.memory_candidates_queued,
memory_candidates_skipped: cloud_outbox.memory_candidates_skipped,
telemetry_attempted: cloud_outbox.telemetry_attempted,
telemetry_uploaded: cloud_outbox.telemetry_uploaded,
telemetry_queued: cloud_outbox.telemetry_queued,
telemetry_skipped: cloud_outbox.telemetry_skipped,
accepted_edit_attribution: cloud_outbox.accepted_edit_attribution,
};
if json {
maybe_schedule_autopilot_after_sync(db, pending_candidates_before_sync).await;
emit_summary_json(&outcome);
return;
}
maybe_schedule_autopilot_after_sync(db, pending_candidates_before_sync).await;
emit_summary_human(&outcome, db).await;
}
async fn maybe_schedule_autopilot_after_sync(
db: &difflore_core::SqlitePool,
pending_candidates_before_sync: u64,
) {
let Ok(pending_candidates_after_sync) =
difflore_core::skills::count_pending_candidates(db, None).await
else {
return;
};
if pending_candidates_after_sync <= pending_candidates_before_sync {
return;
}
crate::commands::memory::mark_memory_autopilot_dirty_best_effort(db, "cloud_sync").await;
crate::commands::memory::schedule_memory_autopilot_best_effort(
db,
"cloud_sync",
difflore_core::memory_autopilot_schedule::EXPLICIT_AUTOPILOT_COOLDOWN_SECS,
)
.await;
}
fn sync_spinner(json: bool, label: &str) -> Option<style::Spinner> {
if json {
None
} else {
Some(style::Spinner::new(label))
}
}
fn sync_spinner_tick(spinner: Option<&style::Spinner>) {
if let Some(spinner) = spinner {
spinner.tick();
}
}
fn sync_spinner_set_message(spinner: &mut Option<style::Spinner>, message: &str) {
if let Some(spinner) = spinner {
spinner.set_message(message);
}
}
fn sync_spinner_finish_ok(spinner: Option<style::Spinner>, message: &str) {
if let Some(spinner) = spinner {
spinner.finish_ok(message);
}
}
fn sync_spinner_finish_err(spinner: Option<style::Spinner>, message: &str) {
if let Some(spinner) = spinner {
spinner.finish_err(message);
}
}
fn emit_not_logged_in(json: bool) -> ! {
if json {
let payload = serde_json::json!({
"ok": false,
"reason": "not_logged_in",
});
println!("{}", crate::support::util::json_compact_or(&payload, "{}"));
exit_code(1);
}
exit_err(
"not logged in.\n\n > run `difflore cloud login` to sync source-backed team rules into local agents",
);
}
async fn emit_dry_run(
db: &difflore_core::SqlitePool,
json: bool,
direction: SyncDirection,
raw_upload_policy: RawUploadPolicy,
) {
let skipped = skipped_raw_upload_counts(db, raw_upload_policy).await;
if json {
let payload = dry_run_payload(direction, raw_upload_policy, skipped);
println!("{}", crate::support::util::json_compact_or(&payload, "{}"));
return;
}
println!(
"{} dry-run: would {} cloud (no changes written).",
style::ok(style::sym::OK),
match direction {
SyncDirection::Pull => "pull from",
SyncDirection::Push => "push to",
SyncDirection::Both => "pull from + push to",
},
);
emit_skipped_raw_upload_lines(skipped);
println!();
println!(
"next: {} {}",
style::cmd("difflore cloud sync"),
style::pewter("then `difflore recall --diff` to see what agents would receive"),
);
}
fn dry_run_payload(
direction: SyncDirection,
raw_upload_policy: RawUploadPolicy,
skipped: RawUploadSkipCounts,
) -> serde_json::Value {
serde_json::json!({
"ok": true,
"dryRun": true,
"pullOnly": matches!(direction, SyncDirection::Pull),
"pushOnly": matches!(direction, SyncDirection::Push),
"rawUploadFlags": {
"includeObservations": raw_upload_policy.include_observations,
"includeCandidates": raw_upload_policy.include_candidates,
"includeTelemetry": raw_upload_policy.include_telemetry,
},
"skippedRawUploads": skipped_raw_upload_counts_value(skipped),
})
}
async fn prepare_excluded_ids(
db: &difflore_core::SqlitePool,
local_skills: &[difflore_core::domain::models::SkillRecord],
) -> Vec<String> {
let pending_ids = match difflore_core::skills::list_candidate_ids(db).await {
Ok(ids) => ids,
Err(e) => exit_err(&format!(
"Failed to load pending memory drafts (would risk syncing them as active): {e}"
)),
};
let source_repos = match difflore_core::skills::list_source_repos(db).await {
Ok(repos) => repos,
Err(e) => exit_err(&format!(
"Failed to load rule source-repo metadata (cannot safely compute exclusions): {e}"
)),
};
let mut excluded_ids = pending_ids;
for skill in local_skills {
if skill.source != "cloud" {
excluded_ids.push(skill.id.clone());
continue;
}
let has_source_repo = source_repos
.get(&skill.id)
.and_then(|repo| repo.as_deref())
.is_some_and(|repo| !repo.trim().is_empty());
let should_have_source_repo = skill.source == "cloud"
&& (skill.origin == "extracted"
|| skill
.tags
.iter()
.any(|tag| tag == "origin:review-extraction" || tag == "auto-from-accept"));
if should_have_source_repo && !has_source_repo {
excluded_ids.push(skill.id.clone());
}
}
excluded_ids.sort();
excluded_ids.dedup();
excluded_ids
}
async fn run_settings_phase(
client: &difflore_core::cloud::client::CloudClient,
direction: SyncDirection,
) -> usize {
let do_pull = direction.do_pull();
let do_push = direction.do_push();
let mut applied = 0usize;
if do_pull {
match difflore_core::cloud::sync::pull_settings(client).await {
Ok(Some((cloud_settings, _updated_at))) => {
match serde_json::from_value::<difflore_core::domain::models::AppSettingsRecord>(
cloud_settings.clone(),
) {
Ok(merged_input) => {
if difflore_core::infra::settings::update(merged_input)
.await
.is_ok()
{
applied = cloud_settings.as_object().map_or(0, serde_json::Map::len);
}
}
Err(e) => eprintln!(
"{} Settings pull skipped: unexpected cloud settings shape: {e}",
style::amber(style::sym::WARN),
),
}
}
Ok(None) => {}
Err(e) => eprintln!(
"{} Settings pull failed: {e}",
style::amber(style::sym::WARN),
),
}
}
if do_push {
let settings_value = match difflore_core::infra::settings::get().await {
Ok(s) => match serde_json::to_value(&s) {
Ok(v) => v,
Err(e) => {
exit_err(&format!(
"unexpected settings shape: failed to serialize local settings ({e})"
));
}
},
Err(e) => {
eprintln!(
"{} Settings push skipped: failed to read local settings: {e}",
style::amber(style::sym::WARN),
);
return applied;
}
};
if let Err(e) = difflore_core::cloud::sync::sync_settings(client, &settings_value).await {
eprintln!(
"{} Settings push failed: {e}",
style::amber(style::sym::WARN),
);
}
}
applied
}
async fn run_providers_phase(
client: &difflore_core::cloud::client::CloudClient,
db: &difflore_core::SqlitePool,
direction: SyncDirection,
) -> usize {
let do_pull = direction.do_pull();
let do_push = direction.do_push();
let mut providers_added = 0usize;
if do_pull {
match difflore_core::cloud::sync::pull_providers(client).await {
Ok(Some((cloud_providers, _updated_at))) => {
providers_added += apply_cloud_providers(db, &cloud_providers).await;
}
Ok(None) => {}
Err(e) => eprintln!(
"{} Providers pull failed: {e}",
style::amber(style::sym::WARN),
),
}
}
if do_push {
let provider_entries = match difflore_core::infra::providers::list(db).await {
Ok(providers) => difflore_core::cloud::sync::build_provider_sync_entries(&providers),
Err(_) => Vec::new(),
};
if let Err(e) = difflore_core::cloud::sync::sync_providers(client, &provider_entries).await
{
eprintln!(
"{} Providers push failed: {e}",
style::amber(style::sym::WARN),
);
}
}
providers_added
}
async fn run_observations_phase(
client: &difflore_core::cloud::client::CloudClient,
) -> (usize, usize, usize) {
let Ok(emitter) = difflore_core::cloud::observations::ObservationEmitter::open_default().await
else {
return (0, 0, 0);
};
let _ = emitter.retry_pending_uploads_now().await;
let mut attempted = 0usize;
let mut uploaded = 0usize;
for _ in 0..MAX_OBSERVATION_SYNC_FLUSHES {
match emitter.flush_to_cloud(client).await {
Ok((batch_attempted, batch_uploaded)) => {
if batch_attempted == 0 {
break;
}
attempted += batch_attempted;
uploaded += batch_uploaded;
if batch_uploaded < batch_attempted {
break;
}
}
Err(e) => {
eprintln!(
"{} Local memory activity upload skipped: {e}",
style::amber(style::sym::WARN),
);
break;
}
}
}
let queued = emitter
.pending_upload_count()
.await
.unwrap_or_default()
.max(0) as usize;
(attempted, uploaded, queued)
}
async fn run_cloud_outbox_phase(
db: &difflore_core::SqlitePool,
client: &difflore_core::cloud::client::CloudClient,
raw_upload_policy: RawUploadPolicy,
) -> CloudOutboxPhaseOutcome {
let mut outcome = CloudOutboxPhaseOutcome::default();
let mut remaining = MAX_CLOUD_OUTBOX_SYNC_ITEMS;
let queue = difflore_core::cloud::outbox::OutboxQueue::new(db.clone());
for (kind, per_kind_limit) in outbox_sync_priority_kinds(raw_upload_policy) {
if remaining == 0 {
break;
}
let limit = per_kind_limit.unwrap_or(remaining).min(remaining);
if limit == 0 {
continue;
}
match difflore_core::cloud::outbox::drain_outbox_kind_report(&queue, client, kind, limit)
.await
{
Ok(report) => {
match kind {
difflore_core::cloud::outbox::kind::SESSION_MINED_CANDIDATE => {
outcome.memory_candidates_attempted += report.attempted;
outcome.memory_candidates_uploaded += report.confirmed;
}
difflore_core::cloud::outbox::kind::OBSERVATION => {
outcome.observations_attempted += report.attempted;
outcome.observations_uploaded += report.confirmed;
}
difflore_core::cloud::outbox::kind::ACCEPTED_EDIT => {
outcome
.accepted_edit_attribution
.add(report.accepted_edit_attribution);
outcome.telemetry_attempted += report.attempted;
outcome.telemetry_uploaded += report.confirmed;
}
difflore_core::cloud::outbox::kind::IMPORTED_REVIEWS
| difflore_core::cloud::outbox::kind::REVIEW_METRICS
| difflore_core::cloud::outbox::kind::TRAJECTORY
| difflore_core::cloud::outbox::kind::MCP_QUERY => {
outcome.telemetry_attempted += report.attempted;
outcome.telemetry_uploaded += report.confirmed;
}
_ => {
debug_assert!(false, "unexpected outbox sync kind: {kind}");
outcome.telemetry_attempted += report.attempted;
outcome.telemetry_uploaded += report.confirmed;
outcome
.accepted_edit_attribution
.add(report.accepted_edit_attribution);
}
}
remaining = remaining.saturating_sub(report.attempted);
}
Err(e) => {
eprintln!(
"{} Local activity upload skipped: {e}",
style::amber(style::sym::WARN),
);
return with_cloud_outbox_pending_counts(db, raw_upload_policy, outcome).await;
}
}
}
with_cloud_outbox_pending_counts(db, raw_upload_policy, outcome).await
}
async fn with_cloud_outbox_pending_counts(
db: &difflore_core::SqlitePool,
raw_upload_policy: RawUploadPolicy,
mut outcome: CloudOutboxPhaseOutcome,
) -> CloudOutboxPhaseOutcome {
if let Some(counts) = pending_cloud_outbox_kind_counts(db).await {
outcome.observations_queued = pending_count_for_kinds(&counts, OBSERVATION_OUTBOX_KINDS);
outcome.memory_candidates_queued = pending_count_for_kinds(&counts, CANDIDATE_OUTBOX_KINDS);
outcome.telemetry_queued = pending_count_for_kinds(&counts, TELEMETRY_OUTBOX_KINDS);
} else {
outcome.observations_queued = outcome
.observations_attempted
.saturating_sub(outcome.observations_uploaded);
outcome.memory_candidates_queued = outcome
.memory_candidates_attempted
.saturating_sub(outcome.memory_candidates_uploaded);
outcome.telemetry_queued = outcome
.telemetry_attempted
.saturating_sub(outcome.telemetry_uploaded);
}
if !raw_upload_policy.include_observations {
outcome.observations_skipped = outcome.observations_queued;
}
if !raw_upload_policy.include_candidates {
outcome.memory_candidates_skipped = outcome.memory_candidates_queued;
}
if !raw_upload_policy.include_telemetry {
outcome.telemetry_skipped = outcome.telemetry_queued;
}
outcome
}
async fn pending_cloud_outbox_kind_counts(
db: &difflore_core::SqlitePool,
) -> Option<Vec<(String, usize)>> {
let queue = difflore_core::cloud::outbox::OutboxQueue::new(db.clone());
match queue.pending_counts_by_kind().await {
Ok(counts) => Some(
counts
.into_iter()
.map(|(kind, count)| (kind, usize::try_from(count.max(0)).unwrap_or_default()))
.collect(),
),
Err(e) => {
eprintln!(
"{} failed to count pending cloud outbox rows: {e}",
style::amber(style::sym::WARN),
);
None
}
}
}
fn pending_count_for_kinds(counts: &[(String, usize)], kinds: &[&str]) -> usize {
counts
.iter()
.filter(|(kind, _)| kinds.contains(&kind.as_str()))
.map(|(_, count)| *count)
.sum()
}
fn outbox_sync_priority_kinds(
raw_upload_policy: RawUploadPolicy,
) -> Vec<(&'static str, Option<usize>)> {
let mut kinds = DEFAULT_OUTBOX_SYNC_PRIORITY_KINDS.to_vec();
if raw_upload_policy.include_telemetry {
kinds.push((difflore_core::cloud::outbox::kind::ACCEPTED_EDIT, None));
kinds.push((difflore_core::cloud::outbox::kind::IMPORTED_REVIEWS, None));
}
if raw_upload_policy.include_candidates {
kinds.extend_from_slice(CANDIDATE_OUTBOX_SYNC_PRIORITY_KINDS);
}
if raw_upload_policy.include_telemetry {
kinds.extend_from_slice(&[
(difflore_core::cloud::outbox::kind::REVIEW_METRICS, None),
(difflore_core::cloud::outbox::kind::TRAJECTORY, None),
(difflore_core::cloud::outbox::kind::MCP_QUERY, None),
]);
}
if raw_upload_policy.include_observations {
kinds.extend_from_slice(OBSERVATION_OUTBOX_SYNC_PRIORITY_KINDS);
}
kinds
}
async fn pending_observation_events_count() -> usize {
let Ok(emitter) = difflore_core::cloud::observations::ObservationEmitter::open_default().await
else {
return 0;
};
match emitter.pending_upload_count().await {
Ok(count) => usize::try_from(count.max(0)).unwrap_or_default(),
Err(e) => {
eprintln!(
"{} failed to count pending observation events: {e}",
style::amber(style::sym::WARN),
);
0
}
}
}
async fn skipped_raw_upload_counts(
db: &difflore_core::SqlitePool,
raw_upload_policy: RawUploadPolicy,
) -> RawUploadSkipCounts {
let mut skipped = RawUploadSkipCounts::default();
if !raw_upload_policy.include_observations {
skipped.observations += pending_observation_events_count().await;
}
let Some(outbox_counts) = pending_cloud_outbox_kind_counts(db).await else {
return skipped;
};
if !raw_upload_policy.include_observations {
skipped.observations += pending_count_for_kinds(&outbox_counts, OBSERVATION_OUTBOX_KINDS);
}
if !raw_upload_policy.include_candidates {
skipped.memory_candidates +=
pending_count_for_kinds(&outbox_counts, CANDIDATE_OUTBOX_KINDS);
}
if !raw_upload_policy.include_telemetry {
skipped.telemetry += pending_count_for_kinds(&outbox_counts, TELEMETRY_OUTBOX_KINDS);
}
skipped
}
async fn apply_cloud_providers(
db: &difflore_core::SqlitePool,
cloud_providers: &serde_json::Value,
) -> usize {
let mut added = 0usize;
let Ok(local_providers) = difflore_core::infra::providers::list(db).await else {
return 0;
};
let mut existing_by_name: std::collections::HashMap<_, _> = local_providers
.into_iter()
.map(|p| (p.name.clone(), p))
.collect();
let Some(list) = cloud_providers.as_array() else {
return 0;
};
for item in list {
let name = item
.get("name")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_owned();
let base_url = item
.get("baseUrl")
.or_else(|| item.get("base_url"))
.and_then(|v| v.as_str())
.unwrap_or("")
.to_owned();
if name.is_empty() || base_url.is_empty() {
continue;
}
let model_mapping = match parse_provider_model_mapping(item) {
Ok(m) => m,
Err(e) => {
eprintln!(
"{} skipping cloud provider {name:?}: unexpected modelMapping shape ({e})",
style::amber(style::sym::WARN),
);
continue;
}
};
if let Some(existing) = existing_by_name.get(&name) {
let base_url_changed = existing.base_url != base_url;
let model_mapping_changed = existing.model_mapping != model_mapping;
if base_url_changed || model_mapping_changed {
match difflore_core::infra::providers::update(
db,
difflore_core::domain::models::ProviderUpdateInput {
id: existing.id.clone(),
name: None,
base_url: base_url_changed.then(|| base_url.clone()),
model_mapping: model_mapping_changed.then(|| model_mapping.clone()),
},
)
.await
{
Ok(updated) => {
existing_by_name.insert(name.clone(), updated);
}
Err(e) => eprintln!(
"{} failed to update cloud provider {name:?}: {e}",
style::amber(style::sym::WARN),
),
}
}
continue;
}
let input = difflore_core::domain::models::ProviderAddInput {
name: name.clone(),
base_url,
model_mapping,
};
match difflore_core::infra::providers::add(db, input).await {
Ok(provider) => {
existing_by_name.insert(name.clone(), provider);
added += 1;
}
Err(e) => eprintln!(
"{} failed to add cloud provider {name:?}: {e}",
style::amber(style::sym::WARN),
),
}
}
added
}
fn parse_provider_model_mapping(
item: &serde_json::Value,
) -> Result<std::collections::HashMap<String, String>, String> {
let raw = item
.get("modelMapping")
.or_else(|| item.get("model_mapping"));
match raw {
None => Ok(std::collections::HashMap::new()),
Some(v) if v.is_null() => Ok(std::collections::HashMap::new()),
Some(v) => serde_json::from_value::<std::collections::HashMap<String, String>>(v.clone())
.map_err(|e| e.to_string()),
}
}
fn sync_summary_payload(outcome: &SyncOutcome) -> serde_json::Value {
serde_json::json!({
"ok": true,
"dryRun": false,
"memory": {
"created": outcome.created,
"updated": outcome.updated,
"deleted": outcome.deleted,
},
"team": {
"visible": outcome.team_count,
"synced": outcome.team_synced,
},
"settings": { "fieldsMerged": outcome.settings_pull_applied },
"providers": { "added": outcome.providers_added },
"observations": {
"attempted": outcome.observations_attempted,
"uploaded": outcome.observations_uploaded,
"queued": outcome.observations_queued,
"skipped": outcome.observations_skipped,
},
"memoryCandidates": {
"attempted": outcome.memory_candidates_attempted,
"uploaded": outcome.memory_candidates_uploaded,
"queued": outcome.memory_candidates_queued,
"failed": outcome.memory_candidates_attempted.saturating_sub(outcome.memory_candidates_uploaded),
"skipped": outcome.memory_candidates_skipped,
},
"telemetryOutbox": {
"attempted": outcome.telemetry_attempted,
"uploaded": outcome.telemetry_uploaded,
"queued": outcome.telemetry_queued,
"failed": outcome.telemetry_attempted.saturating_sub(outcome.telemetry_uploaded),
"skipped": outcome.telemetry_skipped,
},
"acceptedEditProof": {
"uploaded": outcome.accepted_edit_attribution.uploaded,
"launchGrade": outcome.accepted_edit_attribution.launch_grade,
"missingTeamWorkspace": outcome.accepted_edit_attribution.missing_team_workspace,
"missingRuleIds": outcome.accepted_edit_attribution.missing_rule_ids,
"unlinkedRuleObservations": outcome.accepted_edit_attribution.unlinked_rule_observations,
"warningCount": outcome.accepted_edit_attribution.warning_count(),
},
})
}
fn emit_summary_json(outcome: &SyncOutcome) {
let payload = sync_summary_payload(outcome);
println!("{}", crate::support::util::json_compact_or(&payload, "{}"));
}
fn skipped_raw_upload_counts_value(skipped: RawUploadSkipCounts) -> serde_json::Value {
serde_json::json!({
"observations": skipped.observations,
"memoryCandidates": skipped.memory_candidates,
"telemetryOutbox": skipped.telemetry,
"total": skipped.total(),
})
}
fn emit_skipped_raw_upload_lines(skipped: RawUploadSkipCounts) {
if skipped.observations > 0 {
println!(
" observations {} pending raw event{} skipped (pass --include-observations to upload)",
skipped.observations,
if skipped.observations == 1 { "" } else { "s" },
);
}
if skipped.memory_candidates > 0 {
println!(
" session candidates {} pending session-mined candidate{} skipped (pass --include-candidates to upload)",
skipped.memory_candidates,
if skipped.memory_candidates == 1 {
""
} else {
"s"
},
);
}
if skipped.telemetry > 0 {
println!(
" raw telemetry {} pending row{} skipped (pass --include-telemetry to upload)",
skipped.telemetry,
if skipped.telemetry == 1 { "" } else { "s" },
);
}
}
fn memory_candidate_summary_line(
attempted: usize,
uploaded: usize,
queued: usize,
skipped: usize,
) -> String {
let failed = attempted.saturating_sub(uploaded);
if attempted == 0 && queued == 0 {
return " session candidates current".to_owned();
}
if skipped > 0 && attempted == 0 {
return format!(
" session candidates {skipped} pending session-mined candidate{} skipped (--include-candidates to upload)",
if skipped == 1 { "" } else { "s" },
);
}
let mut line = format!(" session candidates {uploaded} uploaded | {queued} pending");
if failed > 0 {
line.push_str(&format!(" | {failed} failed this run"));
}
line
}
fn proof_summary_line(
observations_attempted: usize,
observations_uploaded: usize,
observations_queued: usize,
observations_skipped: usize,
) -> String {
if observations_skipped > 0 && observations_attempted == 0 {
return format!(
" activity {observations_skipped} pending raw event{} skipped (--include-observations to upload)",
if observations_skipped == 1 { "" } else { "s" },
);
}
if observations_attempted > 0 || observations_queued > 0 {
format!(
" activity {observations_uploaded} local event{} uploaded | {} pending",
if observations_uploaded == 1 { "" } else { "s" },
observations_queued,
)
} else {
" activity current".to_owned()
}
}
fn telemetry_summary_line(
attempted: usize,
uploaded: usize,
queued: usize,
skipped: usize,
) -> Option<String> {
if attempted == 0 && queued == 0 {
return None;
}
if skipped > 0 && attempted == 0 {
return Some(format!(
" raw telemetry {skipped} pending row{} skipped (--include-telemetry to upload)",
if skipped == 1 { "" } else { "s" },
));
}
let failed = attempted.saturating_sub(uploaded);
let mut line = format!(" raw telemetry {uploaded} uploaded | {queued} pending");
if failed > 0 {
line.push_str(&format!(" | {failed} failed this run"));
}
Some(line)
}
fn accepted_edit_proof_summary_line(summary: AcceptedEditAttributionSummary) -> Option<String> {
if summary.uploaded == 0 {
return None;
}
Some(format!(
" accepted edits {} uploaded | {} pending",
summary.launch_grade,
summary.uploaded.saturating_sub(summary.launch_grade),
))
}
async fn emit_summary_human(outcome: &SyncOutcome, db: &difflore_core::SqlitePool) {
let SyncOutcome {
created,
updated,
deleted,
team_count,
team_synced,
settings_pull_applied,
providers_added,
observations_attempted,
observations_uploaded,
observations_queued,
observations_skipped,
memory_candidates_attempted,
memory_candidates_uploaded,
memory_candidates_queued,
memory_candidates_skipped,
telemetry_attempted,
telemetry_uploaded,
telemetry_queued,
telemetry_skipped,
accepted_edit_attribution,
} = *outcome;
println!("{} Sync complete", style::ok(style::sym::OK));
println!(" memory {created} created | {updated} updated | {deleted} deleted");
println!(
"{}",
memory_candidate_summary_line(
memory_candidates_attempted,
memory_candidates_uploaded,
memory_candidates_queued,
memory_candidates_skipped,
)
);
if settings_pull_applied > 0 {
println!(" settings {settings_pull_applied} fields merged");
} else {
println!(" settings pushed (no cloud updates to merge)");
}
if providers_added > 0 {
println!(
" providers {providers_added} added from cloud (API keys still need to be set locally)"
);
} else {
println!(" providers current");
}
if team_count > 0 {
println!(
" team {team_count} published memories visible | {team_synced} synced locally"
);
} else {
println!(" team 0 published memories visible");
}
println!(
"{}",
proof_summary_line(
observations_attempted,
observations_uploaded,
observations_queued,
observations_skipped,
)
);
if let Some(line) = telemetry_summary_line(
telemetry_attempted,
telemetry_uploaded,
telemetry_queued,
telemetry_skipped,
) {
println!("{line}");
}
if let Some(line) = accepted_edit_proof_summary_line(accepted_edit_attribution) {
println!("{line}");
}
if accepted_edit_attribution.warning_count() > 0 {
println!(
" {} {} accepted edit upload{} need review: {} missing team workspace | {} missing recalled memory ids | {} missing linked memory activity",
style::amber(style::sym::WARN),
accepted_edit_attribution.warning_count(),
if accepted_edit_attribution.warning_count() == 1 {
""
} else {
"s"
},
accepted_edit_attribution.missing_team_workspace,
accepted_edit_attribution.missing_rule_ids,
accepted_edit_attribution.unlinked_rule_observations,
);
}
let cold_start = created == 0 && updated == 0 && team_count == 0 && team_synced == 0;
if created > 0 {
println!();
println!(
" {} {} new rule{} pulled. {}",
style::emerald(style::sym::TIP),
created,
if created == 1 { "" } else { "s" },
style::pewter("Run `difflore recall --diff` to preview them."),
);
} else if cold_start {
emit_cold_start_hint(db).await;
}
if !cold_start {
println!();
println!(
"next: {} {}",
style::cmd("difflore recall --diff"),
style::pewter("see what Claude/Codex/Cursor would receive"),
);
}
}
async fn emit_cold_start_hint(db: &difflore_core::SqlitePool) {
let imported_review_count = difflore_core::review_store::list_by_source(
db,
difflore_core::review_store::ReviewSourceInput {
source: "github".into(),
},
)
.await
.map_or(0, |v| v.len());
println!();
if imported_review_count > 0 {
println!(
" {} {}; cloud is still extracting rules. Try {} in ~30s.",
style::emerald(style::sym::TIP),
cold_start_extracting_line(imported_review_count),
style::cmd("difflore cloud sync"),
);
println!(
" {} watch progress: {}",
style::pewter(style::sym::BULLET),
style::cmd(&difflore_core::cloud::endpoints::web_link(
"?from=cli-sync&intent=extracting"
)),
);
} else {
println!(
" {} {}",
style::emerald(style::sym::TIP),
style::pewter(
"No new rules yet. Import PR reviews, extract team rules in cloud, then sync again."
),
);
println!(
" {} import: {}",
style::pewter(style::sym::BULLET),
style::cmd("difflore import-reviews --max-prs 50 --upload"),
);
println!(
" {} dashboard: {}",
style::pewter(style::sym::BULLET),
style::cmd(&difflore_core::cloud::endpoints::web_link(
"?from=cli-sync&intent=memory-empty"
)),
);
}
}
pub(crate) fn cold_start_extracting_line(imported_review_count: usize) -> String {
let plural = if imported_review_count == 1 { "" } else { "s" };
format!("{imported_review_count} review{plural} imported across all your repos")
}
pub(crate) fn format_cloud_err(label: &str, e: &str) -> String {
if e.contains("LlmNotConfigured") || e.contains("llmNotConfigured") {
return format!(
"{label}: no LLM API key configured on the cloud side.\n Set one at cloud `/settings` (BYOK) before querying corpora."
);
}
if e.contains("not_logged_in") {
return format!("{label}: not logged in to cloud.\n Run `difflore cloud login` first.");
}
if e.contains("Input validation failed") && e.contains("\"format\":\"uuid\"") {
return format!(
"{label}: cloud rejected the id - short prefixes aren't supported here.\n Pass the full UUID from DiffLore Cloud, then retry."
);
}
difflore_core::domain::origins::format_api_error(label, e)
}
#[cfg(test)]
mod tests {
use super::{
AcceptedEditAttributionSummary, RawUploadPolicy, RawUploadSkipCounts, SyncDirection,
SyncOutcome, accepted_edit_proof_summary_line, cold_start_extracting_line, dry_run_payload,
format_cloud_err, memory_candidate_summary_line, outbox_sync_priority_kinds,
parse_provider_model_mapping, proof_summary_line, sync_summary_payload,
telemetry_summary_line,
};
#[test]
fn cold_start_extracting_line_disambiguates_per_repo_vs_global() {
let one = cold_start_extracting_line(1);
assert!(one.contains("1 review "), "msg: {one}");
assert!(one.contains("across all your repos"), "msg: {one}");
assert!(!one.contains("reviews "), "singular form leaked: {one}");
let many = cold_start_extracting_line(155);
assert!(many.contains("155 reviews "), "msg: {many}");
assert!(many.contains("across all your repos"), "msg: {many}");
}
#[test]
fn sync_summary_json_includes_local_proof_upload_counts() {
let payload = sync_summary_payload(&SyncOutcome {
created: 0,
updated: 0,
deleted: 0,
team_count: 0,
team_synced: 0,
settings_pull_applied: 0,
providers_added: 0,
observations_attempted: 5,
observations_uploaded: 3,
observations_queued: 2,
observations_skipped: 2,
memory_candidates_attempted: 6,
memory_candidates_uploaded: 4,
memory_candidates_queued: 2,
memory_candidates_skipped: 2,
telemetry_attempted: 4,
telemetry_uploaded: 4,
telemetry_queued: 3,
telemetry_skipped: 3,
accepted_edit_attribution: AcceptedEditAttributionSummary {
uploaded: 3,
launch_grade: 1,
missing_team_workspace: 1,
missing_rule_ids: 1,
unlinked_rule_observations: 0,
},
});
assert_eq!(payload["observations"]["attempted"], 5);
assert_eq!(payload["observations"]["uploaded"], 3);
assert_eq!(payload["observations"]["queued"], 2);
assert_eq!(payload["observations"]["skipped"], 2);
assert_eq!(payload["memoryCandidates"]["attempted"], 6);
assert_eq!(payload["memoryCandidates"]["uploaded"], 4);
assert_eq!(payload["memoryCandidates"]["queued"], 2);
assert_eq!(payload["memoryCandidates"]["failed"], 2);
assert_eq!(payload["memoryCandidates"]["skipped"], 2);
assert_eq!(payload["telemetryOutbox"]["attempted"], 4);
assert_eq!(payload["telemetryOutbox"]["uploaded"], 4);
assert_eq!(payload["telemetryOutbox"]["queued"], 3);
assert_eq!(payload["telemetryOutbox"]["failed"], 0);
assert_eq!(payload["telemetryOutbox"]["skipped"], 3);
assert_eq!(payload["acceptedEditProof"]["uploaded"], 3);
assert_eq!(payload["acceptedEditProof"]["launchGrade"], 1);
assert_eq!(payload["acceptedEditProof"]["missingTeamWorkspace"], 1);
assert_eq!(payload["acceptedEditProof"]["missingRuleIds"], 1);
assert_eq!(payload["acceptedEditProof"]["warningCount"], 2);
}
#[test]
fn accepted_edit_summary_line_distinguishes_uploaded_and_pending() {
let line = accepted_edit_proof_summary_line(AcceptedEditAttributionSummary {
uploaded: 4,
launch_grade: 1,
missing_team_workspace: 2,
missing_rule_ids: 1,
unlinked_rule_observations: 0,
})
.expect("uploaded accepted edits render a summary");
assert!(line.contains("1 uploaded"), "{line}");
assert!(line.contains("3 pending"), "{line}");
assert!(
accepted_edit_proof_summary_line(AcceptedEditAttributionSummary::default()).is_none()
);
}
#[test]
fn proof_summary_line_explains_uploaded_and_pending_events() {
let uploaded = proof_summary_line(5, 3, 2, 0);
assert!(uploaded.contains("3 local events uploaded"), "{uploaded}");
assert!(uploaded.contains("2 pending"), "{uploaded}");
let singular = proof_summary_line(1, 1, 0, 0);
assert!(singular.contains("1 local event uploaded"), "{singular}");
assert!(!singular.contains("local events uploaded"), "{singular}");
let queued_only = proof_summary_line(0, 0, 7, 0);
assert!(
queued_only.contains("0 local events uploaded"),
"{queued_only}"
);
assert!(queued_only.contains("7 pending"), "{queued_only}");
let skipped = proof_summary_line(0, 0, 7, 7);
assert!(
skipped.contains("7 pending raw events skipped"),
"{skipped}"
);
assert!(skipped.contains("--include-observations"), "{skipped}");
assert_eq!(proof_summary_line(0, 0, 0, 0), " activity current");
}
#[test]
fn memory_candidate_summary_line_explains_uploads_pending_and_failures() {
assert_eq!(
memory_candidate_summary_line(0, 0, 0, 0),
" session candidates current"
);
let uploaded = memory_candidate_summary_line(6, 4, 2, 0);
assert!(uploaded.contains("4 uploaded"), "{uploaded}");
assert!(uploaded.contains("2 pending"), "{uploaded}");
assert!(uploaded.contains("2 failed this run"), "{uploaded}");
let skipped = memory_candidate_summary_line(0, 0, 3, 3);
assert!(
skipped.contains("3 pending session-mined candidates skipped"),
"{skipped}"
);
assert!(skipped.contains("--include-candidates"), "{skipped}");
}
#[test]
fn telemetry_summary_line_reports_explicit_skips() {
assert!(telemetry_summary_line(0, 0, 0, 0).is_none());
let skipped = telemetry_summary_line(0, 0, 4, 4).expect("skipped telemetry renders");
assert!(skipped.contains("4 pending rows skipped"), "{skipped}");
assert!(skipped.contains("--include-telemetry"), "{skipped}");
let uploaded = telemetry_summary_line(6, 4, 2, 0).expect("uploads render");
assert!(uploaded.contains("4 uploaded"), "{uploaded}");
assert!(uploaded.contains("2 pending"), "{uploaded}");
assert!(uploaded.contains("2 failed this run"), "{uploaded}");
}
#[test]
fn default_outbox_priority_excludes_raw_outbox_queues() {
let kinds = outbox_sync_priority_kinds(RawUploadPolicy::default());
assert!(kinds.is_empty());
}
#[test]
fn raw_outbox_priority_requires_explicit_include_flags() {
let kinds = outbox_sync_priority_kinds(RawUploadPolicy::new(true, true, true));
let pos = |needle: &str| {
kinds
.iter()
.position(|(kind, _)| *kind == needle)
.unwrap_or_else(|| panic!("missing outbox kind {needle}"))
};
assert!(
pos(difflore_core::cloud::outbox::kind::ACCEPTED_EDIT)
< pos(difflore_core::cloud::outbox::kind::OBSERVATION)
);
assert!(
pos(difflore_core::cloud::outbox::kind::IMPORTED_REVIEWS)
< pos(difflore_core::cloud::outbox::kind::SESSION_MINED_CANDIDATE)
);
assert!(
pos(difflore_core::cloud::outbox::kind::SESSION_MINED_CANDIDATE)
< pos(difflore_core::cloud::outbox::kind::REVIEW_METRICS)
);
assert!(
pos(difflore_core::cloud::outbox::kind::REVIEW_METRICS)
< pos(difflore_core::cloud::outbox::kind::TRAJECTORY)
);
assert!(
pos(difflore_core::cloud::outbox::kind::TRAJECTORY)
< pos(difflore_core::cloud::outbox::kind::MCP_QUERY)
);
assert!(
pos(difflore_core::cloud::outbox::kind::MCP_QUERY)
< pos(difflore_core::cloud::outbox::kind::OBSERVATION)
);
assert!(!kinds.iter().any(|(kind, _)| *kind == "fix_acceptance"));
}
#[test]
fn dry_run_payload_reports_skipped_raw_queues_and_flags() {
let payload = dry_run_payload(
SyncDirection::Both,
RawUploadPolicy::new(false, true, false),
RawUploadSkipCounts {
observations: 2,
memory_candidates: 0,
telemetry: 5,
},
);
assert_eq!(payload["rawUploadFlags"]["includeObservations"], false);
assert_eq!(payload["rawUploadFlags"]["includeCandidates"], true);
assert_eq!(payload["rawUploadFlags"]["includeTelemetry"], false);
assert_eq!(payload["skippedRawUploads"]["observations"], 2);
assert_eq!(payload["skippedRawUploads"]["memoryCandidates"], 0);
assert_eq!(payload["skippedRawUploads"]["telemetryOutbox"], 5);
assert_eq!(payload["skippedRawUploads"]["total"], 7);
}
#[test]
fn parse_provider_model_mapping_accepts_missing_field() {
let item = serde_json::json!({"name": "openai", "baseUrl": "https://x"});
let m = parse_provider_model_mapping(&item).expect("missing field is ok");
assert!(m.is_empty());
}
#[test]
fn parse_provider_model_mapping_accepts_null() {
let item = serde_json::json!({"modelMapping": null});
let m = parse_provider_model_mapping(&item).expect("null is ok");
assert!(m.is_empty());
}
#[test]
fn parse_provider_model_mapping_errors_on_wrong_shape() {
let item = serde_json::json!({"modelMapping": ["not", "an", "object"]});
let err = parse_provider_model_mapping(&item).expect_err("wrong shape must error");
assert!(!err.is_empty(), "error must carry context");
let nested_wrong = serde_json::json!({"modelMapping": {"gpt-4": 42}});
assert!(
parse_provider_model_mapping(&nested_wrong).is_err(),
"non-string values must error"
);
}
#[test]
fn format_cloud_err_classifies_known_errors_and_falls_through_unknown() {
let cases: &[(&str, &str)] = &[
("LlmNotConfigured: ...", "BYOK"),
("received not_logged_in from cloud", "difflore cloud login"),
("API error 401: token revoked", "session expired"),
("API error 403: plan_required", "rejected"),
("API error 429: too many requests", "rate-limited"),
(
r#"API error: API error 500: {"code":"INTERNAL_SERVER_ERROR"}"#,
"server error",
),
("request failed: connection refused", "unreachable"),
("DNS error: no such host", "unreachable"),
("connection reset by peer", "unreachable"),
("Network is unreachable", "unreachable"),
("request timed out after 30s", "timed out"),
(
"error sending request for url (http://localhost:3017/api/rules/sync): error trying to connect: tcp connect error: No connection could be made because the target machine actively refused it. (os error 10061)",
"unreachable",
),
("os error 10061", "unreachable"),
];
for (raw, expect) in cases {
let out = format_cloud_err("Action", raw);
assert!(
out.contains(expect),
"want {expect:?} for {raw:?}, got: {out}"
);
}
let raw_4xx_5xx: &[&str] = &[
"API error 401: token revoked because team_seat_revoked",
"API error 403: plan_required",
"API error 429: retry-after=60",
r#"API error 500: {"code":"INTERNAL_SERVER_ERROR"}"#,
];
for raw in raw_4xx_5xx {
let out = format_cloud_err("L", raw);
assert!(
out.contains(raw),
"raw input {raw:?} missing from output: {out}"
);
}
assert_eq!(
format_cloud_err("Custom action", "totally novel error xyz123"),
"Custom action: totally novel error xyz123"
);
}
}