xbp 10.30.1

XBP is a zero-config build pack that can also interact with proxies, kafka, sockets, synthetic monitors.
Documentation
use crate::codetime::{collect_cursor_history, CursorHistoryWorkspaceSnapshot};
use crate::commands::cli_session::{cli_request_client, require_authenticated_cli_session};
use crate::config::{
    record_cursor_ingest_failure, record_cursor_ingest_started, record_cursor_ingest_success,
    reserve_cursor_ingest_slot, resolve_device_identity, ApiConfig,
};
use chrono::{Duration as ChronoDuration, Utc};
use reqwest::Client;
use serde::Serialize;
use std::env;
use std::process::{Command as ProcessCommand, Stdio};

const INGEST_BATCH_SIZE: usize = 25;
const BACKGROUND_CHILD_ENV: &str = "XBP_CURSOR_BACKGROUND_CHILD";
const BACKGROUND_TRIGGER_ENV: &str = "XBP_CURSOR_BACKGROUND_TRIGGER";
const BACKGROUND_MODE: &str = "background";
const MANUAL_MODE: &str = "manual";
const BACKGROUND_MIN_INTERVAL_MINUTES: i64 = 30;

#[cfg(windows)]
const CREATE_NO_WINDOW: u32 = 0x08000000;

#[derive(Debug, Clone, Copy)]
struct CursorIngestSummary {
    workspaces_uploaded: usize,
    entries_uploaded: usize,
    entries_skipped: usize,
}

#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct CursorIngestDevicePayload {
    hardware_id: String,
    device_name: Option<String>,
    hostname: Option<String>,
    platform: String,
}

#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct CursorIngestBatchPayload {
    device: CursorIngestDevicePayload,
    collected_at: String,
    workspaces: Vec<CursorHistoryWorkspaceSnapshot>,
}

#[derive(Debug, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
struct CursorIngestResponse {
    workspaces_upserted: usize,
    entries_upserted: usize,
    entries_skipped: usize,
}

pub fn maybe_start_background_cursor_ingest(trigger: &str) -> Result<bool, String> {
    if !cfg!(windows) || is_background_cursor_ingest_child() {
        return Ok(false);
    }

    if crate::commands::cli_session::resolve_cli_access_token().is_err() {
        return Ok(false);
    }

    let reserved = reserve_cursor_ingest_slot(
        trigger,
        BACKGROUND_MODE,
        ChronoDuration::minutes(BACKGROUND_MIN_INTERVAL_MINUTES),
    )?;
    if !reserved {
        return Ok(false);
    }

    if let Err(error) = spawn_background_cursor_ingest_process(trigger) {
        let _ = record_cursor_ingest_failure(trigger, BACKGROUND_MODE, &error);
        return Err(error);
    }

    Ok(true)
}

pub async fn run_cursor_ingest(dry_run: bool) -> Result<(), String> {
    let trigger = current_cursor_ingest_trigger();
    let mode = current_cursor_ingest_mode();
    if !dry_run {
        let _ = record_cursor_ingest_started(&trigger, mode);
    }

    let result = run_cursor_ingest_once(dry_run).await;
    match result {
        Ok(summary) => {
            if !dry_run {
                let _ = record_cursor_ingest_success(
                    &trigger,
                    mode,
                    summary.workspaces_uploaded,
                    summary.entries_uploaded,
                    summary.entries_skipped,
                );
            }
            Ok(())
        }
        Err(error) => {
            if !dry_run {
                let _ = record_cursor_ingest_failure(&trigger, mode, &error);
            }
            Err(error)
        }
    }
}

async fn run_cursor_ingest_once(dry_run: bool) -> Result<CursorIngestSummary, String> {
    let _session = require_authenticated_cli_session().await?;
    let device = resolve_device_identity()?;
    let collection = collect_cursor_history(None);

    if !collection.supported {
        return Err(collection.note.unwrap_or_else(|| {
            "Cursor history ingestion is not supported on this platform.".to_string()
        }));
    }
    if !collection.exists {
        return Err(collection
            .note
            .unwrap_or_else(|| "Cursor local history directory was not found.".to_string()));
    }

    if collection.workspaces.is_empty() {
        println!("No Cursor local history workspaces found.");
        return Ok(CursorIngestSummary {
            workspaces_uploaded: 0,
            entries_uploaded: 0,
            entries_skipped: 0,
        });
    }

    println!(
        "Collected {} workspaces with {} entries from {}",
        collection.workspace_count, collection.entry_count, collection.history_root
    );

    if dry_run {
        println!("Dry run enabled; skipping dashboard upload.");
        return Ok(CursorIngestSummary {
            workspaces_uploaded: 0,
            entries_uploaded: 0,
            entries_skipped: 0,
        });
    }

    let client = cli_request_client()?;
    let api = ApiConfig::from_env();
    let device_payload = CursorIngestDevicePayload {
        hardware_id: device.hardware_id,
        device_name: build_device_name(),
        hostname: current_hostname(),
        platform: std::env::consts::OS.to_string(),
    };
    let collected_at = collection
        .collected_at
        .unwrap_or_else(Utc::now)
        .to_rfc3339();

    let mut total_workspaces = 0usize;
    let mut total_entries = 0usize;
    let mut total_skipped = 0usize;

    for chunk in collection.workspaces.chunks(INGEST_BATCH_SIZE) {
        let response =
            upload_cursor_history_batch(&client, &api, &device_payload, &collected_at, chunk)
                .await?;
        total_workspaces += response.workspaces_upserted;
        total_entries += response.entries_upserted;
        total_skipped += response.entries_skipped;
    }

    println!(
        "Uploaded Cursor history: {} workspaces, {} entries ({} skipped).",
        total_workspaces, total_entries, total_skipped
    );
    Ok(CursorIngestSummary {
        workspaces_uploaded: total_workspaces,
        entries_uploaded: total_entries,
        entries_skipped: total_skipped,
    })
}

async fn upload_cursor_history_batch(
    client: &Client,
    api: &ApiConfig,
    device: &CursorIngestDevicePayload,
    collected_at: &str,
    workspaces: &[CursorHistoryWorkspaceSnapshot],
) -> Result<CursorIngestResponse, String> {
    let token = crate::commands::cli_session::resolve_cli_access_token()?;
    let payload = CursorIngestBatchPayload {
        device: CursorIngestDevicePayload {
            hardware_id: device.hardware_id.clone(),
            device_name: device.device_name.clone(),
            hostname: device.hostname.clone(),
            platform: device.platform.clone(),
        },
        collected_at: collected_at.to_string(),
        workspaces: workspaces.to_vec(),
    };

    let response = client
        .post(api.cli_cursor_ingest_endpoint())
        .bearer_auth(token)
        .json(&payload)
        .send()
        .await
        .map_err(|e| format!("Failed to upload Cursor history batch: {}", e))?;

    if response.status() == reqwest::StatusCode::UNAUTHORIZED {
        return Err(
            "Your stored CLI session is no longer valid. Run `xbp login` again.".to_string(),
        );
    }

    if !response.status().is_success() {
        let status = response.status();
        let body = response
            .text()
            .await
            .unwrap_or_else(|_| "<empty response>".to_string());
        return Err(format!(
            "Cursor history upload failed with status {}: {}",
            status, body
        ));
    }

    response
        .json::<CursorIngestResponse>()
        .await
        .map_err(|e| format!("Failed to parse Cursor ingest response: {}", e))
}

fn current_hostname() -> Option<String> {
    for key in ["HOSTNAME", "COMPUTERNAME"] {
        if let Ok(value) = std::env::var(key) {
            let trimmed = value.trim();
            if !trimmed.is_empty() {
                return Some(trimmed.to_string());
            }
        }
    }

    None
}

fn build_device_name() -> Option<String> {
    let hostname = current_hostname();
    let username = std::env::var("USERNAME")
        .or_else(|_| std::env::var("USER"))
        .ok()
        .map(|value| value.trim().to_string())
        .filter(|value| !value.is_empty());

    match (username, hostname) {
        (Some(user), Some(host)) => Some(format!("{user}@{host}")),
        (Some(user), None) => Some(user),
        (None, Some(host)) => Some(host),
        (None, None) => None,
    }
}

fn is_background_cursor_ingest_child() -> bool {
    env::var(BACKGROUND_CHILD_ENV)
        .ok()
        .map(|value| value == "1")
        .unwrap_or(false)
}

fn current_cursor_ingest_trigger() -> String {
    env::var(BACKGROUND_TRIGGER_ENV)
        .ok()
        .map(|value| value.trim().to_string())
        .filter(|value| !value.is_empty())
        .unwrap_or_else(|| "manual-cursor-command".to_string())
}

fn current_cursor_ingest_mode() -> &'static str {
    if is_background_cursor_ingest_child() {
        BACKGROUND_MODE
    } else {
        MANUAL_MODE
    }
}

fn spawn_background_cursor_ingest_process(trigger: &str) -> Result<(), String> {
    let executable = env::current_exe()
        .map_err(|error| format!("Failed to resolve current XBP executable: {}", error))?;

    let mut command = ProcessCommand::new(executable);
    command
        .arg("cursor")
        .arg("ingest")
        .env(BACKGROUND_CHILD_ENV, "1")
        .env(BACKGROUND_TRIGGER_ENV, trigger)
        .stdin(Stdio::null())
        .stdout(Stdio::null())
        .stderr(Stdio::null());

    #[cfg(windows)]
    {
        use std::os::windows::process::CommandExt;
        command.creation_flags(CREATE_NO_WINDOW);
    }

    command
        .spawn()
        .map(|_| ())
        .map_err(|error| format!("Failed to spawn background Cursor ingest: {}", error))
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::codetime::CursorHistoryEntrySnapshot;

    #[test]
    fn batch_size_splits_large_collections() {
        let workspaces = (0..60)
            .map(|index| CursorHistoryWorkspaceSnapshot {
                folder_key: format!("folder-{index}"),
                version: 1,
                resource: format!("c:\\tmp\\file-{index}.txt"),
                entries: vec![CursorHistoryEntrySnapshot {
                    entry_id: "entry.txt".to_string(),
                    timestamp: 1,
                    content: Some("demo".to_string()),
                    content_sha256: None,
                    content_encoding: Some("utf-8".to_string()),
                    content_bytes: Some(4),
                }],
            })
            .collect::<Vec<_>>();

        let batches = workspaces
            .chunks(INGEST_BATCH_SIZE)
            .map(|chunk| chunk.len())
            .collect::<Vec<_>>();
        assert_eq!(batches, vec![25, 25, 10]);
    }
}