git-remote-object-store 0.2.4

Git remote helper backed by cloud object stores (S3, Azure Blob Storage)
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
//! Git remote-helper protocol REPL and command dispatcher.
//!
//! [`run`] is generic over its reader and writer so tests can drive it
//! through `tokio::io::duplex`.
//!
//! Stdout is the wire protocol — see `.claude/rules/protocol-stdout.md`.
//! Diagnostics use `tracing` (configured to write to stderr by
//! [`tracing_init::init`]); the only stdout writes happen via the
//! per-command handlers below.

use std::io::ErrorKind;
use std::num::NonZeroU32;
use std::path::PathBuf;
use std::sync::Arc;

use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncWrite, AsyncWriteExt};
use tracing::{debug, error};

use crate::object_store::ObjectStore;
use crate::url::{RemoteUrl, StorageEngine};

pub mod backend;
pub(crate) mod bundle_uri;
pub(crate) mod capabilities;
pub mod fetch;
pub(crate) mod list;
pub(crate) mod option;
pub mod push;
pub mod tracing_init;

use self::fetch::{FetchedRefs, fetch_batch};
use self::option::{OptionEffect, handle_option};
use self::push::{PushOutcome, push_batch};
use self::tracing_init::ReloadHandle;

/// Write each [`PushOutcome`]'s wire line to `writer` in order.
///
/// Both engines' `push_batch` returns `Vec<PushOutcome>`; the rendering
/// loop is identical, so it lives here. Pulled out so the per-engine
/// `Mode::Push` arms in [`run`] each shrink to a single line.
async fn write_push_outcomes<W>(
    writer: &mut W,
    outcomes: &[PushOutcome],
) -> Result<(), std::io::Error>
where
    W: AsyncWrite + Unpin,
{
    for outcome in outcomes {
        writer
            .write_all(outcome.to_protocol_line().as_bytes())
            .await?;
    }
    Ok(())
}

/// Walk `err`'s `source()` chain and append each level's `Display` to
/// `msg`, **skipping any level whose text is already at the tail of
/// `msg`**.
///
/// `thiserror`-derived `#[error]` formats often inline `{0}` or
/// `{source}` of the immediate source at the *tail* of the format
/// string — sometimes recursively. For example: `PushError::Store(
/// "object-store error during push: {0}")` where `{0}` is
/// `ObjectStoreError::Network("network error: {0}")`, which itself
/// inlines its boxed source at the tail. A naive chain-walk that
/// always appends produces `"... network error: dns failure: dns
/// failure"` because `dns failure` is already at the tail. The
/// suffix-only dedup handles every variant currently in this crate.
///
/// Caveat: a wrapper that inlines `{source}` *mid*-string (e.g.
/// `"network error: {0} (transient)"`) is **not** deduped — the inner
/// source would be appended a second time. No such wrapper exists
/// today; if one is added, prefer reformulating its `#[error]` to
/// keep `{source}` at the tail (or extend this helper) rather than
/// living with the duplication.
///
/// Used by both [`backend::fatal_message`] (for the operator-facing
/// `fatal:` line) and [`push`] (for the per-ref `error <ref>` wire
/// line). Sharing the helper keeps the two diagnostics in sync.
pub(crate) fn append_source_chain<E: std::error::Error + ?Sized>(msg: &mut String, err: &E) {
    let mut next = err.source();
    while let Some(src) = next {
        // We need the rendered string twice (once for the suffix check,
        // once to append) so format it once and reuse — `write!` would
        // re-format it via the `Display` impl.
        let rendered = src.to_string();
        if !msg.ends_with(&rendered) {
            msg.push_str(": ");
            msg.push_str(&rendered);
        }
        next = src.source();
    }
}

/// Errors surfaced by the REPL loop.
#[derive(Debug, thiserror::Error)]
pub enum ProtocolError {
    /// Stdin / stdout transport failure.
    #[error("protocol I/O error: {0}")]
    Io(#[from] std::io::Error),

    /// Object-store call failed during `list`.
    #[error("list failed: {0}")]
    List(#[from] list::ListError),

    /// `fetch` batch failed.
    #[error("fetch failed: {0}")]
    Fetch(#[from] fetch::FetchError),

    /// `push` batch failed.
    #[error("push failed: {0}")]
    Push(#[from] push::PushError),

    /// An input line did not match any recognised command.
    #[error("invalid command: {0:?}")]
    InvalidCommand(String),

    /// `FORMAT` validation / engine resolution failed during connect.
    #[error("backend resolution failed: {0}")]
    Backend(#[from] backend::BackendError),

    /// `bundle-uri` command handler failed.
    #[error("bundle-uri failed: {0}")]
    BundleUri(#[from] bundle_uri::BundleUriError),
}

impl ProtocolError {
    /// Returns `true` when the error is a broken-pipe or write-zero I/O
    /// failure — both indicate git closed the helper's stdout, which is a
    /// clean exit rather than a crash.
    #[must_use]
    pub fn is_broken_pipe(&self) -> bool {
        matches!(self, Self::Io(e)
            if matches!(e.kind(), ErrorKind::BrokenPipe | ErrorKind::WriteZero))
    }
}

/// Single-line command parsed from stdin.
#[derive(Debug, PartialEq, Eq)]
enum Command {
    Capabilities,
    BundleUri,
    List { for_push: bool },
    Option(String),
    Fetch(String),
    Push(String),
    Empty,
}

/// Which batched command stream is currently being collected.
///
/// Push and fetch are mutually exclusive within a batch — switching
/// between them resets the accumulator.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum Mode {
    Fetch,
    Push,
}

/// Session-fixed infrastructure shared by [`fetch_batch`] and [`push_batch`].
///
/// Created once per [`run`] call and passed by shared reference to both
/// batch handlers so the call sites don't repeat the `(store, prefix,
/// repo_dir)` triple.
pub(crate) struct BatchCtx {
    pub(crate) store: Arc<dyn ObjectStore>,
    /// Optional repository prefix within the bucket / container.
    pub(crate) prefix: Option<Arc<str>>,
    pub(crate) repo_dir: Arc<PathBuf>,
}

/// Accumulates `fetch` / `push` commands until a blank line flushes the batch.
///
/// The REPL protocol delivers commands as a batch separated by a blank
/// line.  Mode switches between fetch and push (rare but spec-allowed)
/// reset both accumulators so stale commands from the prior mode are
/// discarded.
struct BatchState {
    mode: Option<Mode>,
    fetch_cmds: Vec<String>,
    push_cmds: Vec<String>,
}

impl BatchState {
    fn new() -> Self {
        Self {
            mode: None,
            fetch_cmds: Vec::new(),
            push_cmds: Vec::new(),
        }
    }

    /// Record one command for `incoming` mode, resetting the OTHER
    /// mode's accumulator if the mode has changed.
    fn accumulate(&mut self, incoming: Mode, cmd: String) {
        if self.mode != Some(incoming) {
            match incoming {
                Mode::Fetch => self.push_cmds.clear(),
                Mode::Push => self.fetch_cmds.clear(),
            }
            self.mode = Some(incoming);
        }
        match incoming {
            Mode::Fetch => {
                // Defense-in-depth: the OTHER-mode accumulator was just
                // cleared on a switch (or was already empty); if a
                // future bug ever leaves it non-empty across a drain,
                // panic in debug rather than silently mixing modes.
                debug_assert!(
                    self.push_cmds.is_empty(),
                    "push_cmds must be empty when accumulating a Fetch command",
                );
                self.fetch_cmds.push(cmd);
            }
            Mode::Push => {
                debug_assert!(
                    self.fetch_cmds.is_empty(),
                    "fetch_cmds must be empty when accumulating a Push command",
                );
                self.push_cmds.push(cmd);
            }
        }
    }

    /// Drain the pending batch, returning `(mode, cmds)` when non-empty.
    ///
    /// Returns `None` if there is no current mode or the accumulator is
    /// empty, leaving state unchanged so the REPL can still emit the
    /// mandatory blank-line acknowledgement.
    fn take_pending(&mut self) -> Option<(Mode, Vec<String>)> {
        match self.mode {
            Some(Mode::Fetch) if !self.fetch_cmds.is_empty() => {
                self.mode = None;
                Some((Mode::Fetch, std::mem::take(&mut self.fetch_cmds)))
            }
            Some(Mode::Push) if !self.push_cmds.is_empty() => {
                self.mode = None;
                Some((Mode::Push, std::mem::take(&mut self.push_cmds)))
            }
            _ => None,
        }
    }
}

fn parse_command(line: &str) -> Option<Command> {
    let trimmed = line.trim_end_matches(['\r', '\n']);
    if trimmed.is_empty() {
        return Some(Command::Empty);
    }
    if trimmed == "capabilities" {
        return Some(Command::Capabilities);
    }
    if trimmed == "bundle-uri" {
        return Some(Command::BundleUri);
    }
    // Order matters: "list for-push" must match before "list".
    if trimmed == "list for-push" {
        return Some(Command::List { for_push: true });
    }
    if trimmed == "list" {
        return Some(Command::List { for_push: false });
    }
    if let Some(rest) = trimmed.strip_prefix("option ") {
        return Some(Command::Option(rest.to_owned()));
    }
    if let Some(rest) = trimmed.strip_prefix("fetch ") {
        return Some(Command::Fetch(rest.to_owned()));
    }
    if let Some(rest) = trimmed.strip_prefix("push ") {
        return Some(Command::Push(rest.to_owned()));
    }
    None
}

/// Session-fixed values [`flush_batch`] needs for engine dispatch.
/// Built once at the top of [`run`] so the flush call site doesn't have
/// to repeat the borrow list.
struct FlushCtx<'a> {
    batch_ctx: &'a BatchCtx,
    remote: &'a RemoteUrl,
    engine: StorageEngine,
    zip: bool,
    fetched_refs: &'a FetchedRefs,
}

/// Drain `batch` (if non-empty), dispatching to the engine-specific
/// fetch / push handler, then emit the mandatory blank-line ack. `depth`
/// is consumed for fetches so it applies to *this* batch only — git
/// re-issues `option depth` for each shallow operation.
async fn flush_batch<W>(
    flush: &FlushCtx<'_>,
    batch: &mut BatchState,
    depth: &mut Option<NonZeroU32>,
    writer: &mut W,
) -> Result<(), ProtocolError>
where
    W: AsyncWrite + Unpin,
{
    if let Some((mode, cmds)) = batch.take_pending() {
        match (mode, flush.engine) {
            (Mode::Fetch, StorageEngine::Bundle) => {
                fetch_batch(
                    flush.batch_ctx,
                    cmds,
                    flush.fetched_refs.clone(),
                    depth.take(),
                )
                .await?;
            }
            (Mode::Fetch, StorageEngine::Packchain) => {
                crate::packchain::fetch::fetch_batch(
                    flush.batch_ctx,
                    cmds,
                    flush.fetched_refs.clone(),
                    depth.take(),
                )
                .await?;
            }
            (Mode::Push, StorageEngine::Bundle) => {
                let outcomes = push_batch(
                    flush.batch_ctx,
                    flush.remote.kind(),
                    flush.zip,
                    flush.engine,
                    cmds,
                )
                .await?;
                write_push_outcomes(writer, &outcomes).await?;
            }
            (Mode::Push, StorageEngine::Packchain) => {
                let outcomes =
                    crate::packchain::push::push_batch(flush.batch_ctx, flush.engine, cmds).await?;
                write_push_outcomes(writer, &outcomes).await?;
            }
        }
    }
    writer.write_all(b"\n").await?;
    writer.flush().await?;
    Ok(())
}

/// Run the helper REPL until stdin closes (clean exit) or the writer
/// breaks (`BrokenPipe` — also a clean exit).
///
/// `repo_dir` is the local repository the helper is operating against;
/// the parallel fetch path uses it as the cwd for `git bundle unbundle`.
///
/// # Errors
///
/// Returns [`ProtocolError::Io`] on transport failure,
/// [`ProtocolError::InvalidCommand`] for an unrecognised command, and
/// [`ProtocolError::List`] / [`ProtocolError::Fetch`] /
/// [`ProtocolError::Push`] for backend errors in the respective
/// operations.
///
/// `engine` is the resolved engine returned by [`backend::build`].
/// Threading it through the call chain (rather than re-reading
/// `FORMAT` here) avoids a duplicate round trip per helper invocation.
pub async fn run<R, W>(
    remote: RemoteUrl,
    store: Arc<dyn ObjectStore>,
    engine: StorageEngine,
    reader: R,
    mut writer: W,
    reload: Option<ReloadHandle>,
    repo_dir: PathBuf,
) -> Result<(), ProtocolError>
where
    R: AsyncBufRead + Unpin,
    W: AsyncWrite + Unpin,
{
    // Per-command routing branches on `engine` below so packchain
    // commands hit `crate::packchain::*` and bundle commands hit the
    // bundle code path; the engine is bucket-authoritative (resolved
    // from `FORMAT` in `backend::build`) so a wrong `?engine=` flag
    // can never cross the wires.
    let mut lines = reader.lines();
    let fetched_refs = FetchedRefs::new();
    let mut batch = BatchState::new();
    // Per-operation `option depth <N>` is set immediately before a
    // fetch batch and reset to `None` once that batch drains. Depth is
    // not session-sticky — git re-issues `option depth` for each
    // shallow operation.
    let mut depth: Option<NonZeroU32> = None;
    let zip = remote.flags().zip;
    // bundle-uri (issue #71) is gated on engine == Packchain AND the
    // operator opting in via `?bundle_uri=1`. The gate is computed
    // once at session start so a `?bundle_uri=1` flag on a bundle
    // remote is silently inert (the issue puts the bundle engine
    // explicitly out of scope: bundle filenames rotate per push, so
    // a stable URL would race the next push).
    let advertise_bundle_uri =
        matches!(engine, StorageEngine::Packchain) && remote.flags().bundle_uri;
    let ctx = BatchCtx {
        store,
        prefix: remote.prefix().map(Arc::from),
        repo_dir: Arc::new(repo_dir),
    };
    let flush = FlushCtx {
        batch_ctx: &ctx,
        remote: &remote,
        engine,
        zip,
        fetched_refs: &fetched_refs,
    };

    while let Some(line) = lines.next_line().await? {
        debug!(cmd = %line, "received protocol command");
        let Some(cmd) = parse_command(&line) else {
            error!(cmd = %line, "fatal: invalid command");
            return Err(ProtocolError::InvalidCommand(line));
        };
        match cmd {
            Command::Capabilities => {
                capabilities::handle_capabilities(&mut writer, advertise_bundle_uri).await?;
            }
            Command::BundleUri => {
                let opts = bundle_uri::BundleUriOpts {
                    presign_ttl_seconds: remote.flags().bundle_uri_presign_ttl,
                };
                bundle_uri::handle_bundle_uri(
                    ctx.store.as_ref(),
                    &remote,
                    opts,
                    advertise_bundle_uri,
                    &mut writer,
                )
                .await?;
            }
            Command::List { for_push } => {
                list::handle_list(
                    ctx.store.as_ref(),
                    ctx.prefix.as_deref(),
                    engine,
                    for_push,
                    &mut writer,
                )
                .await?;
            }
            Command::Option(args) => {
                let effect = handle_option(&args, reload.as_ref(), &mut writer).await?;
                if let OptionEffect::SetDepth(d) = effect {
                    depth = Some(d);
                }
            }
            Command::Fetch(args) => batch.accumulate(Mode::Fetch, args),
            Command::Push(args) => batch.accumulate(Mode::Push, args),
            Command::Empty => {
                flush_batch(&flush, &mut batch, &mut depth, &mut writer).await?;
            }
        }
    }
    Ok(())
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn parse_command_recognises_each_form() {
        assert_eq!(parse_command("capabilities\n"), Some(Command::Capabilities));
        assert_eq!(
            parse_command("list\n"),
            Some(Command::List { for_push: false })
        );
        assert_eq!(
            parse_command("list for-push\n"),
            Some(Command::List { for_push: true })
        );
        assert_eq!(
            parse_command("option verbosity 2\n"),
            Some(Command::Option("verbosity 2".into()))
        );
        assert_eq!(
            parse_command("fetch deadbeef refs/heads/main\n"),
            Some(Command::Fetch("deadbeef refs/heads/main".into()))
        );
        assert_eq!(
            parse_command("push refs/heads/main:refs/heads/main\n"),
            Some(Command::Push("refs/heads/main:refs/heads/main".into()))
        );
        assert_eq!(parse_command("\n"), Some(Command::Empty));
    }

    #[test]
    fn parse_command_handles_crlf() {
        assert_eq!(
            parse_command("list\r\n"),
            Some(Command::List { for_push: false })
        );
        assert_eq!(parse_command("\r\n"), Some(Command::Empty));
    }

    #[test]
    fn parse_command_rejects_garbage() {
        assert_eq!(parse_command("nonsense\n"), None);
        // Whitespace-only is treated as garbage; only a literal blank
        // line is the batch separator.
        assert_eq!(parse_command("   \n"), None);
        // Double-space inside a recognised command is rejected. Pinning
        // strict byte-exact matching on the protocol verbs against any
        // future "be lenient with whitespace" regression — `"list  for-push"`
        // (two spaces) must NOT collapse to `Command::List { for_push: true }`.
        assert_eq!(parse_command("list  for-push\n"), None);
        // Trailing space after a verb is also rejected.
        assert_eq!(parse_command("list \n"), None);
    }

    /// `parse_command` matches the strip-prefix verbs (`option`, `fetch`,
    /// `push`) on a single space — the rest is passed through verbatim
    /// to the per-verb argument parser. Pin this contract so a
    /// regression that collapses internal whitespace before the strip
    /// (e.g. `trimmed.split_whitespace().collect()`) is caught here
    /// rather than bouncing off the per-verb parser with a confusing
    /// error.
    #[test]
    fn parse_command_passes_strip_prefix_args_verbatim() {
        // Double space after the verb produces a leading-space arg, NOT
        // a no-op collapse. The downstream parser (e.g. parse_fetch_args)
        // is responsible for rejecting bad arg shapes; parse_command's
        // job ends at the verb match.
        assert_eq!(
            parse_command("fetch  abc def\n"),
            Some(Command::Fetch(" abc def".into())),
        );
        assert_eq!(
            parse_command("push  +ref:ref\n"),
            Some(Command::Push(" +ref:ref".into())),
        );
        // Empty args after the verb are also passed through (rejected
        // by parse_fetch_args / parse_push_args, not here).
        assert_eq!(
            parse_command("fetch \n"),
            Some(Command::Fetch(String::new()))
        );
    }

    // --- append_source_chain ----------------------------------------

    /// Layered wrapper for testing the dedup behaviour of
    /// `append_source_chain`. The inner is a `BoxError` so we can stack
    /// arbitrary depth without writing one struct per level.
    #[derive(Debug, thiserror::Error)]
    #[error("layer: {0}")]
    struct LayerError(#[source] crate::object_store::BoxError);

    #[test]
    fn append_source_chain_skips_levels_already_in_display() {
        // BoxError is a leaf (`io::Error::other`'s Display is just the
        // message). LayerError's Display inlines `{0}` recursively so
        // the top-level `to_string()` already contains every level.
        // append_source_chain must NOT duplicate any of them.
        let inner: crate::object_store::BoxError = Box::new(std::io::Error::other("dns failure"));
        let mid: crate::object_store::BoxError = Box::new(LayerError(inner));
        let top = LayerError(mid);

        let mut msg = top.to_string();
        // `top.to_string()` inlines every level via `{0}`:
        // "layer: layer: dns failure"
        assert_eq!(msg, "layer: layer: dns failure");

        append_source_chain(&mut msg, &top);
        // Walk would land on each source's Display — all already at the
        // tail of `msg` — so dedup must skip every level.
        assert_eq!(
            msg, "layer: layer: dns failure",
            "append_source_chain must not duplicate already-inlined sources",
        );
    }

    #[test]
    fn append_source_chain_appends_when_source_text_is_not_in_display() {
        // A wrapper whose Display does NOT inline its source. The chain
        // walk must surface the inner cause.
        #[derive(Debug, thiserror::Error)]
        #[error("opaque wrapper")]
        struct OpaqueWrapper(#[source] crate::object_store::BoxError);

        let inner: crate::object_store::BoxError = Box::new(std::io::Error::other("dns failure"));
        let top = OpaqueWrapper(inner);

        let mut msg = top.to_string();
        assert_eq!(msg, "opaque wrapper");
        append_source_chain(&mut msg, &top);
        assert_eq!(msg, "opaque wrapper: dns failure");
    }

    #[test]
    fn is_broken_pipe_matches_kinds() {
        let pipe = ProtocolError::Io(std::io::Error::from(ErrorKind::BrokenPipe));
        assert!(pipe.is_broken_pipe());
        let write_zero = ProtocolError::Io(std::io::Error::from(ErrorKind::WriteZero));
        assert!(write_zero.is_broken_pipe());
        let other = ProtocolError::Io(std::io::Error::from(ErrorKind::Other));
        assert!(!other.is_broken_pipe());
        let not_io = ProtocolError::InvalidCommand("bad".into());
        assert!(!not_io.is_broken_pipe());
    }

    // --- BatchState ---------------------------------------------------

    #[test]
    fn batch_state_empty_take_returns_none() {
        let mut batch = BatchState::new();
        assert!(batch.take_pending().is_none());
    }

    #[test]
    fn batch_state_accumulate_and_take_round_trip() {
        let mut batch = BatchState::new();
        batch.accumulate(Mode::Fetch, "a".to_owned());
        batch.accumulate(Mode::Fetch, "b".to_owned());
        let (mode, cmds) = batch.take_pending().expect("non-empty fetch batch");
        assert_eq!(mode, Mode::Fetch);
        assert_eq!(cmds, ["a", "b"]);
        // Mode is reset after drain; a second take returns None.
        assert!(batch.take_pending().is_none());
    }

    #[test]
    fn batch_state_mode_switch_clears_prior_cmds() {
        let mut batch = BatchState::new();
        // Accumulate fetch commands, then switch to push mid-batch.
        batch.accumulate(Mode::Fetch, "fetch-cmd".to_owned());
        batch.accumulate(Mode::Push, "push-cmd".to_owned());
        // Only the push command survives the mode switch.
        let (mode, cmds) = batch.take_pending().expect("non-empty push batch");
        assert_eq!(mode, Mode::Push);
        assert_eq!(cmds, ["push-cmd"]);
        assert!(batch.take_pending().is_none());
    }

    #[test]
    fn batch_state_accumulate_with_no_cmds_after_mode_set_takes_none() {
        // Verify that take_pending does not return Some for a mode with
        // an empty accumulator (mode is set but no cmds were pushed).
        // This can happen if the mode was set by accumulate and then all
        // cmds were consumed, leaving mode non-None but cmds empty.
        let mut batch = BatchState::new();
        batch.accumulate(Mode::Fetch, "only-cmd".to_owned());
        batch.take_pending(); // drain and reset mode
        // After take, mode == None; a spurious second take must return None.
        assert!(batch.take_pending().is_none());
    }
}