use nix_uri::{FlakeRef, RefKind};
use ropey::Rope;
use std::cmp::Ordering;
use std::collections::HashSet;
use std::sync::Mutex;
use super::api::{BatchLookup, ForgeClient};
use super::channel::{
UpdateStrategy, channel_probe_candidates, detect_strategy, find_latest_channel,
parse_channel_ref,
};
use super::version::{is_downgrade, parse_ref};
use crate::edit::InputMap;
use crate::input::Input;
use crate::uri::is_git_url;
const FETCH_CONCURRENCY: usize = 4;
#[derive(Debug)]
pub struct Updater {
text: Rope,
inputs: Vec<UpdateInput>,
offset: i32,
client: ForgeClient,
}
struct UpdatePlan {
previous_ref: String,
final_change: String,
updated_uri: String,
}
impl Updater {
fn print_update_status(id: &str, previous_version: &str, final_change: &str) -> bool {
let is_up_to_date = previous_version == final_change;
let initialized = previous_version.is_empty();
if is_up_to_date {
println!(
"{} is already on the latest version: {previous_version}.",
id
);
return false;
}
if initialized {
println!("Initialized {} version pin at {final_change}.", id);
} else {
println!("Updated {} from {previous_version} to {final_change}.", id);
}
true
}
pub fn new(text: Rope, map: InputMap) -> Self {
let client = ForgeClient::new();
let mut inputs = vec![];
for (_id, input) in map {
if !input.has_editable_url() {
continue;
}
let url_start = text.byte_to_char(input.range.start) + 1;
let url_end = text.byte_to_char(input.range.end) - 1;
inputs.push(UpdateInput {
input,
url_start,
url_end,
});
}
Self {
inputs,
text,
offset: 0,
client,
}
}
fn url_char_range(&self, input: &UpdateInput) -> (usize, usize) {
let start = (input.url_start as i32 + self.offset) as usize;
let end = (input.url_end as i32 + self.offset) as usize;
(start, end)
}
fn get_index(&self, id: &str) -> Option<usize> {
let bare = id
.strip_prefix('"')
.and_then(|s| s.strip_suffix('"'))
.unwrap_or(id);
self.inputs
.iter()
.position(|n| n.input.id().as_str() == bare)
}
pub fn pin_input_to_ref(&mut self, id: &str, rev: &str) -> Result<(), String> {
self.sort();
let idx = self.get_index(id).ok_or_else(|| id.to_string())?;
let input = self.inputs[idx].clone();
tracing::debug!("Input: {:?}", input);
self.change_input_to_rev(&input, rev);
Ok(())
}
pub fn unpin_input(&mut self, id: &str) -> Result<(), String> {
self.sort();
let idx = self.get_index(id).ok_or_else(|| id.to_string())?;
let input = self.inputs[idx].clone();
tracing::debug!("Input: {:?}", input);
self.remove_ref_and_rev(&input);
Ok(())
}
pub fn update_all_to_latest_semver(&mut self, init: bool) {
self.update_matching(|_| true, init);
}
pub fn update_inputs_to_latest_semver(&mut self, ids: &[&str], init: bool) {
if ids.is_empty() {
return;
}
let set: HashSet<&str> = ids.iter().copied().collect();
self.update_matching(|id| set.contains(id), init);
}
fn update_matching<F: Fn(&str) -> bool>(&mut self, keep: F, init: bool) {
self.sort();
let pending: Vec<(UpdateInput, String)> = self
.inputs
.iter()
.filter(|i| keep(i.input.id.as_str()))
.map(|i| {
let uri = self.get_input_text(i);
(i.clone(), uri)
})
.collect();
if pending.is_empty() {
return;
}
let github_lookups = build_github_batch_lookups(&pending);
if github_lookups.len() >= 2
&& let Err(e) = self.client.batch_warm_github(&github_lookups)
{
tracing::debug!(
"GraphQL batch warm failed; falling back to REST per input: {}",
e
);
}
let results = parallel_fetch(&self.client, pending, init);
for (input, plan) in results {
let Some(plan) = plan else { continue };
if Self::print_update_status(
input.input.id.as_str(),
&plan.previous_ref,
&plan.final_change,
) {
self.update_input(input, &plan.updated_uri);
}
}
}
pub fn get_changes(&self) -> String {
self.text.to_string()
}
fn get_input_text(&self, input: &UpdateInput) -> String {
let (start, end) = self.url_char_range(input);
self.text.slice(start..end).to_string()
}
pub(crate) fn change_input_to_rev(&mut self, input: &UpdateInput, rev: &str) {
let uri = self.get_input_text(input);
match uri.parse::<FlakeRef>() {
Ok(parsed) => {
let updated = parsed.pin_to_rev(rev.into()).into_uri();
self.update_input(input.clone(), &updated);
}
Err(e) => {
tracing::error!("Error while changing input: {}", e);
}
}
}
fn remove_ref_and_rev(&mut self, input: &UpdateInput) {
let uri = self.get_input_text(input);
match uri.parse::<FlakeRef>() {
Ok(mut parsed) => {
if parsed.ref_kind() == RefKind::None {
return;
}
parsed.set_ref(None);
parsed.set_rev(None);
self.update_input(input.clone(), &parsed.into_uri());
}
Err(e) => {
tracing::error!("Error while changing input: {}", e);
}
}
}
fn sort(&mut self) {
self.inputs.sort();
}
fn update_input(&mut self, input: UpdateInput, change: &str) {
let (start, end) = self.url_char_range(&input);
let previous_len = (end - start) as i32;
self.text.remove(start..end);
self.text.insert(start, change);
self.offset += change.chars().count() as i32 - previous_len;
}
}
#[derive(Debug, Clone)]
pub(crate) struct UpdateInput {
input: Input,
url_start: usize,
url_end: usize,
}
impl Ord for UpdateInput {
fn cmp(&self, other: &Self) -> Ordering {
self.url_start.cmp(&other.url_start)
}
}
impl PartialEq for UpdateInput {
fn eq(&self, other: &Self) -> bool {
self.url_start == other.url_start
}
}
impl PartialOrd for UpdateInput {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Eq for UpdateInput {}
fn parallel_fetch(
client: &ForgeClient,
pending: Vec<(UpdateInput, String)>,
init: bool,
) -> Vec<(UpdateInput, Option<UpdatePlan>)> {
let n = pending.len();
if n == 0 {
return Vec::new();
}
let cap = std::cmp::min(n, FETCH_CONCURRENCY);
if cap <= 1 {
let mut results = Vec::with_capacity(n);
for (input, uri) in pending {
let plan = compute_change(client, &uri, init);
results.push((input, plan));
}
return results;
}
type WorkItem = (usize, UpdateInput, String);
type ResultSlot = Mutex<Option<(UpdateInput, Option<UpdatePlan>)>>;
let work: Mutex<Vec<WorkItem>> = Mutex::new(
pending
.into_iter()
.enumerate()
.map(|(i, (u, s))| (i, u, s))
.collect(),
);
let slots: Vec<ResultSlot> = (0..n).map(|_| Mutex::new(None)).collect();
std::thread::scope(|s| {
for _ in 0..cap {
let work = &work;
let slots = &slots;
s.spawn(move || {
loop {
let next = work.lock().expect("fetch work queue poisoned").pop();
let Some((idx, input, uri)) = next else { break };
let plan = compute_change(client, &uri, init);
*slots[idx].lock().expect("fetch result slot poisoned") = Some((input, plan));
}
});
}
});
slots
.into_iter()
.map(|m| {
m.into_inner()
.expect("fetch result slot poisoned")
.expect("scope returned with an unfilled fetch slot")
})
.collect()
}
fn build_github_batch_lookups(pending: &[(UpdateInput, String)]) -> Vec<BatchLookup> {
let mut lookups = Vec::new();
for (_, uri) in pending {
let Ok(parsed) = uri.parse::<FlakeRef>() else {
continue;
};
let canonical = match parsed.domain() {
None => "github.com",
Some(d) => d,
};
if canonical != "github.com" {
continue;
}
let (Some(owner), Some(repo)) = (parsed.owner(), parsed.repo()) else {
continue;
};
match detect_strategy(owner, repo) {
UpdateStrategy::SemverTags => {
lookups.push(BatchLookup::Tags {
owner: owner.to_string(),
repo: repo.to_string(),
});
}
UpdateStrategy::NixpkgsChannel
| UpdateStrategy::HomeManagerChannel
| UpdateStrategy::NixDarwinChannel => {
let current_ref = parsed.ref_or_rev().unwrap_or_default();
if current_ref.is_empty() {
continue;
}
let channel = parse_channel_ref(current_ref);
let (Some(prefix), Some(current_version)) = (channel.prefix(), channel.version())
else {
continue;
};
let candidates = channel_probe_candidates(prefix, current_version);
lookups.push(BatchLookup::ChannelCandidates {
owner: owner.to_string(),
repo: repo.to_string(),
prefix: prefix.to_string(),
candidates,
});
}
}
}
lookups
}
fn compute_change(client: &ForgeClient, uri: &str, init: bool) -> Option<UpdatePlan> {
let parsed = match uri.parse::<FlakeRef>() {
Ok(p) => p,
Err(e) => {
tracing::error!("Failed to parse URI: {}", e);
return None;
}
};
let owner = match parsed.owner() {
Some(o) => o.to_owned(),
None => {
tracing::debug!("Skipping input without owner");
return None;
}
};
let repo = match parsed.repo() {
Some(r) => r.to_owned(),
None => {
tracing::debug!("Skipping input without repo");
return None;
}
};
let strategy = detect_strategy(&owner, &repo);
tracing::debug!("Update strategy for {}/{}: {:?}", owner, repo, strategy);
match strategy {
UpdateStrategy::NixpkgsChannel
| UpdateStrategy::HomeManagerChannel
| UpdateStrategy::NixDarwinChannel => {
compute_channel_change(client, &parsed, &owner, &repo)
}
UpdateStrategy::SemverTags => {
compute_semver_change(client, uri, &parsed, &owner, &repo, init)
}
}
}
fn compute_channel_change(
client: &ForgeClient,
parsed: &FlakeRef,
owner: &str,
repo: &str,
) -> Option<UpdatePlan> {
let domain = parsed.domain();
let current_ref = parsed.ref_or_rev().unwrap_or_default().to_owned();
if current_ref.is_empty() {
tracing::debug!("Skipping unpinned channel input: {}/{}", owner, repo);
return None;
}
let has_refs_heads_prefix = current_ref.starts_with("refs/heads/");
let latest = match find_latest_channel(client, ¤t_ref, owner, repo, domain) {
Ok(Some(latest)) => latest,
Ok(None) => return None,
Err(e) => {
tracing::error!(
"Failed to resolve latest channel for {}/{}: {}",
owner,
repo,
e
);
return None;
}
};
let final_ref = if has_refs_heads_prefix {
format!("refs/heads/{}", latest)
} else {
latest.clone()
};
let updated_uri = parsed.clone().with_ref(Some(final_ref.clone())).into_uri();
Some(UpdatePlan {
previous_ref: current_ref,
final_change: final_ref,
updated_uri,
})
}
fn compute_semver_change(
client: &ForgeClient,
uri: &str,
parsed: &FlakeRef,
owner: &str,
repo: &str,
init: bool,
) -> Option<UpdatePlan> {
let is_git = is_git_url(uri);
let maybe_version = parsed.ref_or_rev().unwrap_or_default();
let parsed_ref = parse_ref(maybe_version, init);
if !init && let Err(e) = semver::Version::parse(&parsed_ref.normalized_for_semver) {
tracing::debug!("Skip non semver version: {}: {}", maybe_version, e);
return None;
}
let tags = if is_git {
let domain = parsed.domain()?.to_owned();
match client.list_tags(owner, repo, Some(&domain)) {
Ok(t) => t,
Err(_) => {
tracing::error!("Failed to fetch tags for {}/{} on {}", owner, repo, domain);
return None;
}
}
} else {
match client.list_tags(owner, repo, None) {
Ok(t) => t,
Err(_) => {
tracing::error!("Failed to fetch tags for {}/{}", owner, repo);
return None;
}
}
};
let change = match tags.get_latest_tag() {
Some(c) => c,
None => {
tracing::error!("Could not find latest version for {}/{}", owner, repo);
return None;
}
};
if !init && is_downgrade(maybe_version, &change) {
tracing::warn!(
"Refusing to downgrade {}/{} from {} to {}",
owner,
repo,
maybe_version,
change
);
eprintln!(
"Warning: skipping {}/{}: latest tag {} is older than the current pin {}.",
owner, repo, change, maybe_version
);
return None;
}
let final_change = if parsed_ref.has_refs_tags_prefix {
format!("refs/tags/{}", change)
} else {
change.clone()
};
let updated_uri = parsed
.clone()
.with_ref(Some(final_change.clone()))
.into_uri();
Some(UpdatePlan {
previous_ref: parsed_ref.previous_ref,
final_change,
updated_uri,
})
}