Skip to main content

flake_edit/forge/
update.rs

1use nix_uri::{FlakeRef, RefKind};
2use ropey::Rope;
3use std::cmp::Ordering;
4use std::collections::HashSet;
5use std::sync::Mutex;
6
7use super::api::{BatchLookup, ForgeClient};
8use super::channel::{
9    UpdateStrategy, channel_probe_candidates, detect_strategy, find_latest_channel,
10    parse_channel_ref,
11};
12use super::version::{is_downgrade, parse_ref};
13use crate::edit::InputMap;
14use crate::input::Input;
15use crate::uri::is_git_url;
16
17/// Cap on concurrent forge requests during the fetch phase. Overlaps
18/// per-request latency without tripping anonymous rate limits.
19const FETCH_CONCURRENCY: usize = 4;
20
21/// Rewrites flake.nix URIs for `update`, `pin`, and `unpin`.
22///
23/// Owns one [`ForgeClient`] for the lifetime of the updater so the
24/// inner forge fetches share a single HTTP agent and memoize repeats
25/// across all inputs in one pass.
26#[derive(Debug)]
27pub struct Updater {
28    text: Rope,
29    inputs: Vec<UpdateInput>,
30    /// Cumulative char delta from edits applied earlier in the current pass.
31    /// Measured in *characters*, since ropey indexes by char.
32    offset: i32,
33    client: ForgeClient,
34}
35
36/// Per-input outcome from the fetch phase.
37///
38/// Kept separate from the edit phase so multiple inputs can race on
39/// the forge while edits to the source text stay strictly sequential
40/// and in source order.
41struct UpdatePlan {
42    /// `ref_or_rev` as it stood before this update.
43    ///
44    /// Empty when `init` is on and the input was unpinned: the edit
45    /// phase prints `Initialized ...` in that case.
46    previous_ref: String,
47    /// Final ref string after normalisation (`refs/tags/` /
48    /// `refs/heads/` re-applied where needed). Compared against
49    /// `previous_ref` to decide whether to emit `already on the
50    /// latest version`.
51    final_change: String,
52    updated_uri: String,
53}
54
55impl Updater {
56    fn print_update_status(id: &str, previous_version: &str, final_change: &str) -> bool {
57        let is_up_to_date = previous_version == final_change;
58        let initialized = previous_version.is_empty();
59
60        if is_up_to_date {
61            println!(
62                "{} is already on the latest version: {previous_version}.",
63                id
64            );
65            return false;
66        }
67
68        if initialized {
69            println!("Initialized {} version pin at {final_change}.", id);
70        } else {
71            println!("Updated {} from {previous_version} to {final_change}.", id);
72        }
73
74        true
75    }
76
77    /// Build an [`Updater`] from `text` and the inputs in `map`. Inputs
78    /// without an editable URL are skipped.
79    pub fn new(text: Rope, map: InputMap) -> Self {
80        let client = ForgeClient::new();
81        let mut inputs = vec![];
82        for (_id, input) in map {
83            if !input.has_editable_url() {
84                continue;
85            }
86            // `Input::range` carries rnix `TextRange` *byte* offsets, but ropey
87            // indexes by *character*. Convert once against the pristine text so
88            // later in-place edits can use simple char-offset arithmetic.
89            let url_start = text.byte_to_char(input.range.start) + 1;
90            let url_end = text.byte_to_char(input.range.end) - 1;
91            inputs.push(UpdateInput {
92                input,
93                url_start,
94                url_end,
95            });
96        }
97        Self {
98            inputs,
99            text,
100            offset: 0,
101            client,
102        }
103    }
104
105    /// Char-index range of the URL string *contents* (without the surrounding `"`),
106    /// adjusted for earlier in-place edits.
107    fn url_char_range(&self, input: &UpdateInput) -> (usize, usize) {
108        let start = (input.url_start as i32 + self.offset) as usize;
109        let end = (input.url_end as i32 + self.offset) as usize;
110        (start, end)
111    }
112    fn get_index(&self, id: &str) -> Option<usize> {
113        let bare = id
114            .strip_prefix('"')
115            .and_then(|s| s.strip_suffix('"'))
116            .unwrap_or(id);
117        self.inputs
118            .iter()
119            .position(|n| n.input.id().as_str() == bare)
120    }
121    /// Pin the input named `id` to `rev`.
122    ///
123    /// # Errors
124    ///
125    /// Returns the requested `id` if no such input exists.
126    pub fn pin_input_to_ref(&mut self, id: &str, rev: &str) -> Result<(), String> {
127        self.sort();
128        let idx = self.get_index(id).ok_or_else(|| id.to_string())?;
129        let input = self.inputs[idx].clone();
130        tracing::debug!("Input: {:?}", input);
131        self.change_input_to_rev(&input, rev);
132        Ok(())
133    }
134
135    /// Remove any `?ref=` or `?rev=` pin from `id`.
136    ///
137    /// # Errors
138    ///
139    /// Returns the requested `id` if no such input exists.
140    pub fn unpin_input(&mut self, id: &str) -> Result<(), String> {
141        self.sort();
142        let idx = self.get_index(id).ok_or_else(|| id.to_string())?;
143        let input = self.inputs[idx].clone();
144        tracing::debug!("Input: {:?}", input);
145        self.remove_ref_and_rev(&input);
146        Ok(())
147    }
148
149    pub fn update_all_to_latest_semver(&mut self, init: bool) {
150        self.update_matching(|_| true, init);
151    }
152
153    /// Update only the inputs whose id appears in `ids`.
154    ///
155    /// IDs that do not name an editable input are silently skipped, the
156    /// same way the all-inputs path skips inputs without an editable URL.
157    /// Duplicates collapse: each matching input is processed at most once.
158    pub fn update_inputs_to_latest_semver(&mut self, ids: &[&str], init: bool) {
159        if ids.is_empty() {
160            return;
161        }
162        let set: HashSet<&str> = ids.iter().copied().collect();
163        self.update_matching(|id| set.contains(id), init);
164    }
165
166    /// Two-phase update over the inputs whose id satisfies `keep`.
167    ///
168    /// Phase 1 fans the fetch out across a bounded worker pool so the
169    /// per-input forge round-trips overlap. Phase 2 walks the results
170    /// in source order and applies each rewrite serially, because
171    /// [`Updater::update_input`]'s cumulative `offset` arithmetic is
172    /// only valid when edits land left-to-right in the source text.
173    ///
174    /// Status prints (`Updated X ...`, `Initialized X ...`, `X is
175    /// already on the latest version`) are emitted by the edit phase
176    /// so they too appear in source order, regardless of the order
177    /// in which workers actually finished their fetches.
178    fn update_matching<F: Fn(&str) -> bool>(&mut self, keep: F, init: bool) {
179        self.sort();
180
181        // Snapshot URIs against the pristine source text. `self.offset`
182        // is zero on entry, so [`Self::get_input_text`] returns exactly
183        // what's in the original source; later edit-phase rewrites
184        // shift the offset and would otherwise corrupt these slices.
185        let pending: Vec<(UpdateInput, String)> = self
186            .inputs
187            .iter()
188            .filter(|i| keep(i.input.id.as_str()))
189            .map(|i| {
190                let uri = self.get_input_text(i);
191                (i.clone(), uri)
192            })
193            .collect();
194
195        if pending.is_empty() {
196            return;
197        }
198
199        // One GraphQL POST resolves every github.com lookup in
200        // `pending`. The REST path stays intact for non-github
201        // forges and as a fallback if the warm step fails (anonymous
202        // run, partial errors, transport hiccup), so worst case is a
203        // wasted POST plus the existing per-input REST round trips.
204        let github_lookups = build_github_batch_lookups(&pending);
205        // Threshold of 2: a single-input batch trades a REST GET for
206        // a GraphQL POST of the same wall-clock cost without any
207        // overlap dividend, so the parallel-fetch path keeps it.
208        // Two or more inputs are where the round-trip count actually
209        // collapses.
210        if github_lookups.len() >= 2
211            && let Err(e) = self.client.batch_warm_github(&github_lookups)
212        {
213            tracing::debug!(
214                "GraphQL batch warm failed; falling back to REST per input: {}",
215                e
216            );
217        }
218
219        let results = parallel_fetch(&self.client, pending, init);
220
221        for (input, plan) in results {
222            let Some(plan) = plan else { continue };
223            if Self::print_update_status(
224                input.input.id.as_str(),
225                &plan.previous_ref,
226                &plan.final_change,
227            ) {
228                self.update_input(input, &plan.updated_uri);
229            }
230        }
231    }
232
233    /// Current source after all queued edits.
234    pub fn get_changes(&self) -> String {
235        self.text.to_string()
236    }
237
238    fn get_input_text(&self, input: &UpdateInput) -> String {
239        let (start, end) = self.url_char_range(input);
240        self.text.slice(start..end).to_string()
241    }
242
243    /// Rewrite `input`'s URL to pin it to `rev`.
244    pub(crate) fn change_input_to_rev(&mut self, input: &UpdateInput, rev: &str) {
245        let uri = self.get_input_text(input);
246        match uri.parse::<FlakeRef>() {
247            Ok(parsed) => {
248                let updated = parsed.pin_to_rev(rev.into()).into_uri();
249                self.update_input(input.clone(), &updated);
250            }
251            Err(e) => {
252                tracing::error!("Error while changing input: {}", e);
253            }
254        }
255    }
256    fn remove_ref_and_rev(&mut self, input: &UpdateInput) {
257        let uri = self.get_input_text(input);
258        match uri.parse::<FlakeRef>() {
259            Ok(mut parsed) => {
260                if parsed.ref_kind() == RefKind::None {
261                    return;
262                }
263                parsed.set_ref(None);
264                parsed.set_rev(None);
265                self.update_input(input.clone(), &parsed.into_uri());
266            }
267            Err(e) => {
268                tracing::error!("Error while changing input: {}", e);
269            }
270        }
271    }
272    // Sort by source range so multi-edit passes stay aligned with `offset`.
273    fn sort(&mut self) {
274        self.inputs.sort();
275    }
276    fn update_input(&mut self, input: UpdateInput, change: &str) {
277        let (start, end) = self.url_char_range(&input);
278        let previous_len = (end - start) as i32;
279        self.text.remove(start..end);
280        self.text.insert(start, change);
281        self.offset += change.chars().count() as i32 - previous_len;
282    }
283}
284
285/// Wrapper that lets [`Updater`] sort inputs by source position.
286#[derive(Debug, Clone)]
287pub(crate) struct UpdateInput {
288    input: Input,
289    /// Char index of the first URL character (inside the quotes) in the
290    /// original, unmodified text.
291    url_start: usize,
292    /// Char index one past the last URL character in the original text.
293    url_end: usize,
294}
295
296impl Ord for UpdateInput {
297    fn cmp(&self, other: &Self) -> Ordering {
298        self.url_start.cmp(&other.url_start)
299    }
300}
301
302impl PartialEq for UpdateInput {
303    fn eq(&self, other: &Self) -> bool {
304        self.url_start == other.url_start
305    }
306}
307
308impl PartialOrd for UpdateInput {
309    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
310        Some(self.cmp(other))
311    }
312}
313
314impl Eq for UpdateInput {}
315
316/// Results come back in the same order as `pending`, which the
317/// caller is expected to keep sorted by source position so the
318/// subsequent edit phase can walk results left-to-right without a
319/// second sort.
320///
321/// Workers re-use one [`ForgeClient`], so fan-out also amortises
322/// TCP / TLS over its connection pool rather than running N
323/// independent handshakes.
324fn parallel_fetch(
325    client: &ForgeClient,
326    pending: Vec<(UpdateInput, String)>,
327    init: bool,
328) -> Vec<(UpdateInput, Option<UpdatePlan>)> {
329    let n = pending.len();
330    if n == 0 {
331        return Vec::new();
332    }
333    let cap = std::cmp::min(n, FETCH_CONCURRENCY);
334
335    // With nothing to overlap, skip the pool entirely; keeps the
336    // single-input `update <id>` path on the calling thread.
337    if cap <= 1 {
338        let mut results = Vec::with_capacity(n);
339        for (input, uri) in pending {
340            let plan = compute_change(client, &uri, init);
341            results.push((input, plan));
342        }
343        return results;
344    }
345
346    // Pop order is irrelevant: each work item carries its index so
347    // results land in source-order `slots` regardless of which
348    // worker happens to claim it.
349    type WorkItem = (usize, UpdateInput, String);
350    type ResultSlot = Mutex<Option<(UpdateInput, Option<UpdatePlan>)>>;
351    let work: Mutex<Vec<WorkItem>> = Mutex::new(
352        pending
353            .into_iter()
354            .enumerate()
355            .map(|(i, (u, s))| (i, u, s))
356            .collect(),
357    );
358    let slots: Vec<ResultSlot> = (0..n).map(|_| Mutex::new(None)).collect();
359
360    std::thread::scope(|s| {
361        for _ in 0..cap {
362            let work = &work;
363            let slots = &slots;
364            s.spawn(move || {
365                loop {
366                    let next = work.lock().expect("fetch work queue poisoned").pop();
367                    let Some((idx, input, uri)) = next else { break };
368                    let plan = compute_change(client, &uri, init);
369                    *slots[idx].lock().expect("fetch result slot poisoned") = Some((input, plan));
370                }
371            });
372        }
373    });
374
375    slots
376        .into_iter()
377        .map(|m| {
378            m.into_inner()
379                .expect("fetch result slot poisoned")
380                .expect("scope returned with an unfilled fetch slot")
381        })
382        .collect()
383}
384
385/// Plan one [`BatchLookup`] per github.com input in `pending`.
386///
387/// Inputs whose canonical domain is not `github.com`, or whose ref
388/// is unpinned, unstable, or unrecognized, are skipped: those inputs
389/// fall through to the REST path uncached. The returned list keeps
390/// `pending`'s order so aliases in the GraphQL document line up
391/// 1:1 with source-ordered inputs, which keeps debug logs and
392/// breakpoints readable.
393fn build_github_batch_lookups(pending: &[(UpdateInput, String)]) -> Vec<BatchLookup> {
394    let mut lookups = Vec::new();
395    for (_, uri) in pending {
396        let Ok(parsed) = uri.parse::<FlakeRef>() else {
397            continue;
398        };
399        // canonical_domain(None) == "github.com": shorthand inputs
400        // route through the github cache slot, so they batch the same
401        // way explicit `https://github.com/...` inputs do.
402        let canonical = match parsed.domain() {
403            None => "github.com",
404            Some(d) => d,
405        };
406        if canonical != "github.com" {
407            continue;
408        }
409        let (Some(owner), Some(repo)) = (parsed.owner(), parsed.repo()) else {
410            continue;
411        };
412        match detect_strategy(owner, repo) {
413            UpdateStrategy::SemverTags => {
414                lookups.push(BatchLookup::Tags {
415                    owner: owner.to_string(),
416                    repo: repo.to_string(),
417                });
418            }
419            UpdateStrategy::NixpkgsChannel
420            | UpdateStrategy::HomeManagerChannel
421            | UpdateStrategy::NixDarwinChannel => {
422                let current_ref = parsed.ref_or_rev().unwrap_or_default();
423                if current_ref.is_empty() {
424                    continue;
425                }
426                let channel = parse_channel_ref(current_ref);
427                let (Some(prefix), Some(current_version)) = (channel.prefix(), channel.version())
428                else {
429                    continue;
430                };
431                let candidates = channel_probe_candidates(prefix, current_version);
432                lookups.push(BatchLookup::ChannelCandidates {
433                    owner: owner.to_string(),
434                    repo: repo.to_string(),
435                    prefix: prefix.to_string(),
436                    candidates,
437                });
438            }
439        }
440    }
441    lookups
442}
443
444/// Resolve the new URI for a single input.
445///
446/// Pure with respect to the [`Updater`] state: takes only
447/// `&ForgeClient` plus the snapshotted URI text, so it is safe to
448/// run on a worker thread without aliasing `Updater::text` /
449/// `Updater::offset`. Per-input forge errors are logged via
450/// `tracing` and returned as `None`, so one flaky input never
451/// aborts the rest of the update run.
452fn compute_change(client: &ForgeClient, uri: &str, init: bool) -> Option<UpdatePlan> {
453    let parsed = match uri.parse::<FlakeRef>() {
454        Ok(p) => p,
455        Err(e) => {
456            tracing::error!("Failed to parse URI: {}", e);
457            return None;
458        }
459    };
460
461    let owner = match parsed.owner() {
462        Some(o) => o.to_owned(),
463        None => {
464            tracing::debug!("Skipping input without owner");
465            return None;
466        }
467    };
468    let repo = match parsed.repo() {
469        Some(r) => r.to_owned(),
470        None => {
471            tracing::debug!("Skipping input without repo");
472            return None;
473        }
474    };
475
476    let strategy = detect_strategy(&owner, &repo);
477    tracing::debug!("Update strategy for {}/{}: {:?}", owner, repo, strategy);
478
479    match strategy {
480        UpdateStrategy::NixpkgsChannel
481        | UpdateStrategy::HomeManagerChannel
482        | UpdateStrategy::NixDarwinChannel => {
483            compute_channel_change(client, &parsed, &owner, &repo)
484        }
485        UpdateStrategy::SemverTags => {
486            compute_semver_change(client, uri, &parsed, &owner, &repo, init)
487        }
488    }
489}
490
491/// Resolve the new URI for a channel-strategy input (nixpkgs,
492/// home-manager, nix-darwin). Returns `None` when the input is
493/// unpinned, the ref is unstable, or the forge probe fails. The
494/// failure case logs through `tracing` so a flaky channel does not
495/// abort the rest of the run. Preserves a `refs/heads/` prefix on
496/// the final ref iff the input already carried one.
497fn compute_channel_change(
498    client: &ForgeClient,
499    parsed: &FlakeRef,
500    owner: &str,
501    repo: &str,
502) -> Option<UpdatePlan> {
503    let domain = parsed.domain();
504    let current_ref = parsed.ref_or_rev().unwrap_or_default().to_owned();
505
506    if current_ref.is_empty() {
507        tracing::debug!("Skipping unpinned channel input: {}/{}", owner, repo);
508        return None;
509    }
510
511    let has_refs_heads_prefix = current_ref.starts_with("refs/heads/");
512
513    let latest = match find_latest_channel(client, &current_ref, owner, repo, domain) {
514        Ok(Some(latest)) => latest,
515        Ok(None) => return None,
516        Err(e) => {
517            tracing::error!(
518                "Failed to resolve latest channel for {}/{}: {}",
519                owner,
520                repo,
521                e
522            );
523            return None;
524        }
525    };
526
527    let final_ref = if has_refs_heads_prefix {
528        format!("refs/heads/{}", latest)
529    } else {
530        latest.clone()
531    };
532    let updated_uri = parsed.clone().with_ref(Some(final_ref.clone())).into_uri();
533
534    Some(UpdatePlan {
535        previous_ref: current_ref,
536        final_change: final_ref,
537        updated_uri,
538    })
539}
540
541/// Resolve the new URI for a semver-strategy input. Returns `None`
542/// when the current ref does not parse as semver and `init` is off
543/// (so non-semver pins aren't silently overwritten), or when the
544/// tag listing fails. Preserves a `refs/tags/` prefix on the final
545/// ref iff the input already carried one.
546fn compute_semver_change(
547    client: &ForgeClient,
548    uri: &str,
549    parsed: &FlakeRef,
550    owner: &str,
551    repo: &str,
552    init: bool,
553) -> Option<UpdatePlan> {
554    let is_git = is_git_url(uri);
555    let maybe_version = parsed.ref_or_rev().unwrap_or_default();
556    let parsed_ref = parse_ref(maybe_version, init);
557
558    if !init && let Err(e) = semver::Version::parse(&parsed_ref.normalized_for_semver) {
559        tracing::debug!("Skip non semver version: {}: {}", maybe_version, e);
560        return None;
561    }
562
563    let tags = if is_git {
564        let domain = parsed.domain()?.to_owned();
565        match client.list_tags(owner, repo, Some(&domain)) {
566            Ok(t) => t,
567            Err(_) => {
568                tracing::error!("Failed to fetch tags for {}/{} on {}", owner, repo, domain);
569                return None;
570            }
571        }
572    } else {
573        match client.list_tags(owner, repo, None) {
574            Ok(t) => t,
575            Err(_) => {
576                tracing::error!("Failed to fetch tags for {}/{}", owner, repo);
577                return None;
578            }
579        }
580    };
581
582    let change = match tags.get_latest_tag() {
583        Some(c) => c,
584        None => {
585            tracing::error!("Could not find latest version for {}/{}", owner, repo);
586            return None;
587        }
588    };
589
590    if !init && is_downgrade(maybe_version, &change) {
591        tracing::warn!(
592            "Refusing to downgrade {}/{} from {} to {}",
593            owner,
594            repo,
595            maybe_version,
596            change
597        );
598        eprintln!(
599            "Warning: skipping {}/{}: latest tag {} is older than the current pin {}.",
600            owner, repo, change, maybe_version
601        );
602        return None;
603    }
604
605    let final_change = if parsed_ref.has_refs_tags_prefix {
606        format!("refs/tags/{}", change)
607    } else {
608        change.clone()
609    };
610    let updated_uri = parsed
611        .clone()
612        .with_ref(Some(final_change.clone()))
613        .into_uri();
614
615    Some(UpdatePlan {
616        previous_ref: parsed_ref.previous_ref,
617        final_change,
618        updated_uri,
619    })
620}