Skip to main content

git_remote_object_store/lfs/
run.rs

1//! REPL driver for the LFS custom-transfer protocol.
2//!
3//! Generic over reader and writer so tests can drive it through
4//! `tokio::io::duplex`; the bin entrypoint wires real stdin/stdout.
5//!
6//! Stdout is the wire protocol — see `.claude/rules/protocol-stdout.md`.
7//! Diagnostic output goes through `tracing` (configured to write to
8//! stderr or a debug log file by the bin entrypoint).
9
10use std::path::{Path, PathBuf};
11use std::str::FromStr;
12use std::sync::Arc;
13
14use thiserror::Error;
15use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncWrite};
16use tracing::{debug, error, warn};
17
18use crate::lfs::agent::{self, Agent, AgentError, ERR_CODE_GENERIC, ERR_CODE_INIT};
19use crate::lfs::oid::LfsOid;
20use crate::lfs::protocol::{CompleteEvent, ErrorPayload, Event, InitEvent, InitResponse};
21use crate::object_store::ObjectStore;
22use crate::protocol::backend;
23use crate::url;
24
25/// Errors surfaced by [`run`] that are *fatal* to the agent process.
26///
27/// Backend / object-store errors that occur after init are not in
28/// here — they are folded into per-event `complete` payloads by the
29/// [`Agent`].
30#[derive(Debug, Error)]
31pub enum RunError {
32    /// Underlying transport (stdin/stdout) failed.
33    #[error("LFS protocol I/O error: {0}")]
34    Io(#[from] std::io::Error),
35    /// Agent dispatch error (transport or serialization).
36    #[error(transparent)]
37    Agent(#[from] AgentError),
38    /// An incoming line was not valid LFS JSON, or an outgoing event
39    /// could not be serialized. Either is fatal — the protocol cannot
40    /// continue past a parse mismatch.
41    #[error("malformed LFS event: {0}")]
42    MalformedEvent(#[from] serde_json::Error),
43    /// First event was not `init`. The LFS spec requires it. The
44    /// payload is the `Debug` rendering of the offending event,
45    /// captured at construction time.
46    #[error("expected init as the first event, got {0}")]
47    InitNotFirst(String),
48    /// Stdin closed before any event was read.
49    #[error("stdin closed before init")]
50    StdinClosed,
51}
52
53impl RunError {
54    /// `true` if this error is a `BrokenPipe` / `WriteZero` from
55    /// stdout closing — the bin-side REPL turns those into a clean
56    /// exit. Walks both the direct `Io` variant and the nested
57    /// `Agent(AgentError::Io)` variant produced by writes that flow
58    /// through the agent's [`write_event`][crate::lfs::agent::write_event].
59    #[must_use]
60    pub fn is_broken_pipe(&self) -> bool {
61        let io_err = match self {
62            Self::Io(e) | Self::Agent(AgentError::Io(e)) => Some(e),
63            _ => None,
64        };
65        io_err.is_some_and(|e| {
66            matches!(
67                e.kind(),
68                std::io::ErrorKind::BrokenPipe | std::io::ErrorKind::WriteZero,
69            )
70        })
71    }
72}
73
74/// Init-time failures that the bin-side REPL converts into an LFS
75/// `init` error response and a clean exit. Distinct from [`RunError`]
76/// because none of these are fatal to the agent — they're reported on
77/// the wire as `{"error":{...}}`, then the loop returns `Ok(())`.
78#[derive(Debug, Error)]
79enum InitError {
80    /// `init.remote` was the empty string. Upstream's helper accepts
81    /// it and then explodes later; we reject up front.
82    #[error("init.remote is empty")]
83    EmptyRemote,
84    /// `git remote get-url` / URL parsing / backend construction
85    /// failed for the named remote.
86    #[error("cannot resolve remote \"{remote}\": {source}")]
87    Resolve {
88        /// Remote name from the init event.
89        remote: String,
90        /// Underlying resolver failure.
91        #[source]
92        source: Box<dyn std::error::Error + Send + Sync>,
93    },
94}
95
96/// How to resolve a remote name to an [`ObjectStore`]. Production
97/// uses a `gix`-based resolver; tests inject a closure that returns
98/// a `MockStore` (the in-memory test backend gated on `test-util`).
99#[async_trait::async_trait]
100pub trait RemoteResolver: Send + Sync {
101    /// Resolve `remote_name` → `(object store, optional bucket prefix)`.
102    async fn resolve(
103        &self,
104        remote_name: &str,
105    ) -> Result<(Arc<dyn ObjectStore>, Option<String>), Box<dyn std::error::Error + Send + Sync>>;
106}
107
108/// Production resolver: opens the local repo via `gix`, reads the
109/// remote URL, parses it, and builds the matching object-store
110/// backend.
111pub struct GitRemoteResolver {
112    /// Working directory of the local repository (cwd at process
113    /// start).
114    pub repo_dir: PathBuf,
115}
116
117#[async_trait::async_trait]
118impl RemoteResolver for GitRemoteResolver {
119    async fn resolve(
120        &self,
121        remote_name: &str,
122    ) -> Result<(Arc<dyn ObjectStore>, Option<String>), Box<dyn std::error::Error + Send + Sync>>
123    {
124        // `?` against `Box<dyn Error + Send + Sync>` uses the blanket
125        // `From<E: Error + Send + Sync + 'static> for Box<...>`, so no
126        // explicit cast is needed at each call site.
127        let repo = gix::discover(&self.repo_dir)?;
128        let raw = crate::git::remote_url(&repo, remote_name)?;
129        let parsed = url::parse(&raw)?;
130        let prefix = parsed.prefix().map(str::to_owned);
131        // LFS is engine-independent (objects live at `<prefix>/lfs/<oid>`
132        // regardless of the bundle/packchain choice); discard the
133        // resolved engine.
134        let (store, _engine) = backend::build(&parsed).await?;
135        Ok((store, prefix))
136    }
137}
138
139/// Drive the LFS REPL until stdin closes or `terminate` arrives.
140///
141/// `tmp_dir` is the destination directory for downloads
142/// (`<git-dir>/lfs/tmp`).
143///
144/// # Errors
145///
146/// Returns [`RunError::StdinClosed`] if stdin closes before the first event,
147/// [`RunError::MalformedEvent`] for unparseable JSON, or
148/// [`RunError::InitNotFirst`] if the first event is not `init`.
149/// Transport or serialisation errors from upload/download operations surface
150/// as [`RunError::Io`] or [`RunError::Agent`].
151pub async fn run<R, W, Res>(
152    reader: R,
153    mut writer: W,
154    resolver: &Res,
155    tmp_dir: &Path,
156) -> Result<(), RunError>
157where
158    R: AsyncBufRead + Unpin,
159    W: AsyncWrite + Unpin,
160    Res: RemoteResolver + ?Sized,
161{
162    let mut lines = reader.lines();
163
164    let Some(first) = lines.next_line().await? else {
165        return Err(RunError::StdinClosed);
166    };
167    let event = parse_event(&first)?;
168    let init = match event {
169        Event::Init(init) => init,
170        Event::Terminate => {
171            // Spec doesn't require ack on terminate; mirror upstream's
172            // silent exit.
173            debug!("received terminate before init; exiting");
174            return Ok(());
175        }
176        other => {
177            return Err(RunError::InitNotFirst(format!("{other:?}")));
178        }
179    };
180
181    let agent = match init_agent(&init, resolver, tmp_dir.to_owned()).await {
182        Ok(a) => {
183            write_init_ack(&mut writer, None).await?;
184            a
185        }
186        Err(err) => {
187            error!(error = %err, "init failed");
188            write_init_ack(&mut writer, Some(&err.to_string())).await?;
189            return Ok(());
190        }
191    };
192
193    while let Some(line) = lines.next_line().await? {
194        debug!(line = %line, "lfs event");
195        let event = parse_event(&line)?;
196        match event {
197            Event::Init(_) => {
198                warn!("received second init; ignoring");
199            }
200            Event::Upload(u) => {
201                if let Some(oid) = validate_oid(&u.oid, &mut writer, "upload").await? {
202                    agent
203                        .upload(&oid, u.size, Path::new(&u.path), &mut writer)
204                        .await?;
205                }
206            }
207            Event::Download(d) => {
208                if let Some(oid) = validate_oid(&d.oid, &mut writer, "download").await? {
209                    agent.download(&oid, d.size, &mut writer).await?;
210                }
211            }
212            Event::Terminate => {
213                debug!("received terminate; exiting");
214                break;
215            }
216        }
217    }
218    Ok(())
219}
220
221fn parse_event(line: &str) -> Result<Event, RunError> {
222    // Malformed JSON is fatal — git-lfs never sends garbage on the
223    // wire. The `?` operator at call sites turns this into
224    // `RunError::MalformedEvent` via the `#[from]` impl.
225    Ok(serde_json::from_str(line)?)
226}
227
228async fn init_agent<Res>(
229    init: &InitEvent,
230    resolver: &Res,
231    tmp_dir: PathBuf,
232) -> Result<Agent, InitError>
233where
234    Res: RemoteResolver + ?Sized,
235{
236    if init.remote.is_empty() {
237        return Err(InitError::EmptyRemote);
238    }
239    let (store, prefix) =
240        resolver
241            .resolve(&init.remote)
242            .await
243            .map_err(|source| InitError::Resolve {
244                remote: init.remote.clone(),
245                source,
246            })?;
247    Ok(Agent::new(store, prefix, tmp_dir))
248}
249
250/// Validate `oid_raw` at the run-loop boundary. Returns `Some(oid)`
251/// on success (the caller dispatches into the agent), or `None`
252/// after emitting a `complete` event with an empty `oid` field and
253/// the raw rejected value folded into the `error.message`. The
254/// `op` label flows into the warn-log so an operator can correlate
255/// the rejection with the source event line.
256async fn validate_oid<W: AsyncWrite + Unpin>(
257    oid_raw: &str,
258    writer: &mut W,
259    op: &'static str,
260) -> Result<Option<LfsOid>, RunError> {
261    match LfsOid::from_str(oid_raw) {
262        Ok(oid) => Ok(Some(oid)),
263        Err(err) => {
264            warn!(oid = %oid_raw, error = %err, op, "rejecting malformed oid");
265            let message = format!("invalid oid `{oid_raw}`: {err}");
266            let evt = CompleteEvent {
267                event: "complete",
268                oid: "",
269                path: None,
270                error: Some(ErrorPayload {
271                    code: ERR_CODE_GENERIC,
272                    message: &message,
273                }),
274            };
275            agent::write_event(writer, &evt).await?;
276            Ok(None)
277        }
278    }
279}
280
281async fn write_init_ack<W: AsyncWrite + Unpin>(
282    writer: &mut W,
283    error_msg: Option<&str>,
284) -> Result<(), RunError> {
285    let resp = InitResponse {
286        error: error_msg.map(|m| ErrorPayload {
287            code: ERR_CODE_INIT,
288            message: m,
289        }),
290    };
291    Ok(agent::write_event(writer, &resp).await?)
292}
293
294#[cfg(test)]
295mod tests {
296    use super::*;
297    use crate::object_store::mock::MockStore;
298    use bytes::Bytes;
299    use tempfile::TempDir;
300
301    struct StubResolver {
302        store: MockStore,
303        prefix: Option<String>,
304    }
305
306    #[async_trait::async_trait]
307    impl RemoteResolver for StubResolver {
308        async fn resolve(
309            &self,
310            _remote_name: &str,
311        ) -> Result<(Arc<dyn ObjectStore>, Option<String>), Box<dyn std::error::Error + Send + Sync>>
312        {
313            Ok((Arc::new(self.store.clone()), self.prefix.clone()))
314        }
315    }
316
317    fn good_oid() -> String {
318        "fedcba9876543210fedcba9876543210fedcba9876543210fedcba9876543210".to_owned()
319    }
320
321    async fn drive(
322        events: &[String],
323        resolver: &dyn RemoteResolver,
324        tmp_dir: &Path,
325    ) -> (Vec<String>, Result<(), RunError>) {
326        let mut input = events.join("\n");
327        if !events.is_empty() {
328            input.push('\n');
329        }
330        let reader = tokio::io::BufReader::new(std::io::Cursor::new(input.into_bytes()));
331        let mut output: Vec<u8> = Vec::new();
332        let res = run(reader, &mut output, resolver, tmp_dir).await;
333        let lines = String::from_utf8(output)
334            .unwrap()
335            .lines()
336            .map(str::to_owned)
337            .collect();
338        (lines, res)
339    }
340
341    #[tokio::test]
342    async fn full_round_trip_init_upload_download_terminate() {
343        let store = MockStore::new();
344        let oid = good_oid();
345        let body = b"some body";
346        // Pre-seed the second oid for download.
347        let oid2 = good_oid();
348        store.insert(format!("repo/lfs/{oid2}"), Bytes::from_static(body));
349
350        let resolver = StubResolver {
351            store: store.clone(),
352            prefix: Some("repo".to_owned()),
353        };
354
355        let tmp = TempDir::new().unwrap();
356        let src = tmp.path().join("src");
357        tokio::fs::write(&src, body).await.unwrap();
358
359        let events = vec![
360            r#"{"event":"init","operation":"upload","remote":"origin"}"#.to_owned(),
361            format!(
362                r#"{{"event":"upload","oid":"{oid}","size":{size},"path":"{path}"}}"#,
363                size = body.len(),
364                path = src.to_str().unwrap(),
365            ),
366            format!(
367                r#"{{"event":"download","oid":"{oid2}","size":{size}}}"#,
368                size = body.len(),
369            ),
370            r#"{"event":"terminate"}"#.to_owned(),
371        ];
372        let (lines, res) = drive(&events, &resolver, tmp.path()).await;
373        res.expect("run should exit cleanly");
374
375        // Expected: init ack, progress+complete (upload), progress+complete (download).
376        assert_eq!(lines[0], "{}", "init ack should be empty object");
377        assert!(lines.iter().any(|l| l.contains("\"event\":\"progress\"")));
378        let completes: Vec<_> = lines
379            .iter()
380            .filter(|l| l.contains("\"event\":\"complete\""))
381            .collect();
382        assert_eq!(completes.len(), 2, "expected two completes: {lines:?}");
383        assert!(store.contains(&format!("repo/lfs/{oid}")));
384    }
385
386    #[tokio::test]
387    async fn init_failure_emits_error_object_and_exits_cleanly() {
388        struct FailingResolver;
389        #[async_trait::async_trait]
390        impl RemoteResolver for FailingResolver {
391            async fn resolve(
392                &self,
393                _remote_name: &str,
394            ) -> Result<
395                (Arc<dyn ObjectStore>, Option<String>),
396                Box<dyn std::error::Error + Send + Sync>,
397            > {
398                Err("no such remote".into())
399            }
400        }
401        let tmp = TempDir::new().unwrap();
402        let events = vec![r#"{"event":"init","remote":"origin"}"#.to_owned()];
403        let (lines, res) = drive(&events, &FailingResolver, tmp.path()).await;
404        res.expect("init failure is non-fatal");
405        assert_eq!(lines.len(), 1);
406        assert!(lines[0].contains("\"error\""));
407        assert!(lines[0].contains(&format!("\"code\":{ERR_CODE_INIT}")));
408    }
409
410    #[tokio::test]
411    async fn first_non_init_event_is_fatal() {
412        let store = MockStore::new();
413        let resolver = StubResolver {
414            store,
415            prefix: Some("repo".into()),
416        };
417        let tmp = TempDir::new().unwrap();
418        let events = vec![r#"{"event":"upload","oid":"abc","size":1,"path":"/tmp/x"}"#.to_owned()];
419        let (_, res) = drive(&events, &resolver, tmp.path()).await;
420        let err = res.expect_err("non-init first event must error");
421        assert!(matches!(err, RunError::InitNotFirst(_)));
422    }
423
424    #[test]
425    fn init_not_first_display_does_not_double_quote_payload() {
426        // Regression guard: the variant carries a payload that has
427        // already been `Debug`-rendered by the caller, so the error
428        // message must use `{0}` (Display) over the wrapped String,
429        // not `{0:?}` which would double-quote the Debug form.
430        let err = RunError::InitNotFirst("Upload(UploadEvent { oid: \"abc\" })".to_owned());
431        let rendered = err.to_string();
432        assert!(
433            rendered.starts_with("expected init as the first event, got Upload(UploadEvent {"),
434            "InitNotFirst should not wrap the payload in extra quotes: {rendered}"
435        );
436    }
437
438    #[tokio::test]
439    async fn empty_remote_in_init_emits_error_object_and_exits_cleanly() {
440        // Regression guard for InitError::EmptyRemote — upstream's
441        // helper accepts the empty string and explodes later; we
442        // reject up front and emit the structured error response.
443        struct UnreachableResolver;
444        #[async_trait::async_trait]
445        impl RemoteResolver for UnreachableResolver {
446            async fn resolve(
447                &self,
448                _remote_name: &str,
449            ) -> Result<
450                (Arc<dyn ObjectStore>, Option<String>),
451                Box<dyn std::error::Error + Send + Sync>,
452            > {
453                panic!("resolver should not be called when init.remote is empty");
454            }
455        }
456        let tmp = TempDir::new().unwrap();
457        let events = vec![r#"{"event":"init","remote":""}"#.to_owned()];
458        let (lines, res) = drive(&events, &UnreachableResolver, tmp.path()).await;
459        res.expect("empty-remote init failure is non-fatal");
460        assert_eq!(lines.len(), 1);
461        assert!(lines[0].contains("\"error\""));
462        assert!(lines[0].contains(&format!("\"code\":{ERR_CODE_INIT}")));
463        assert!(
464            lines[0].contains("init.remote is empty"),
465            "ack should include the InitError::EmptyRemote message: {}",
466            lines[0]
467        );
468    }
469
470    #[tokio::test]
471    async fn broken_pipe_during_init_ack_is_clean_exit() {
472        // Regression guard: if stdout closes mid-init-ack, the bin
473        // turns the resulting error into a clean exit. RunError
474        // must classify it as `is_broken_pipe()` so the bin's
475        // `Err(other) if other.is_broken_pipe()` arm fires.
476        use tokio::io::duplex;
477
478        // A writer that returns BrokenPipe immediately. A `duplex`
479        // pair where the read half is dropped achieves this.
480        let (writer, reader) = duplex(64);
481        drop(reader); // force BrokenPipe on the next write
482
483        let store = MockStore::new();
484        let resolver = StubResolver {
485            store,
486            prefix: None,
487        };
488        let tmp = TempDir::new().unwrap();
489        let input = r#"{"event":"init","remote":"origin"}"#;
490        let buffered = tokio::io::BufReader::new(std::io::Cursor::new(input.as_bytes().to_vec()));
491
492        let res = run(buffered, writer, &resolver, tmp.path()).await;
493        let err = res.expect_err("write to closed duplex must surface as Err");
494        assert!(
495            err.is_broken_pipe(),
496            "init-ack BrokenPipe must be classified as broken-pipe, got: {err:?}"
497        );
498    }
499
500    #[tokio::test]
501    async fn malformed_json_is_fatal() {
502        let store = MockStore::new();
503        let resolver = StubResolver {
504            store,
505            prefix: None,
506        };
507        let tmp = TempDir::new().unwrap();
508        let events = vec!["not json".to_owned()];
509        let (_, res) = drive(&events, &resolver, tmp.path()).await;
510        let err = res.expect_err("garbage line must error");
511        assert!(matches!(err, RunError::MalformedEvent(_)));
512    }
513
514    #[tokio::test]
515    async fn empty_stdin_returns_stdin_closed() {
516        let store = MockStore::new();
517        let resolver = StubResolver {
518            store,
519            prefix: None,
520        };
521        let tmp = TempDir::new().unwrap();
522        let (_, res) = drive(&[], &resolver, tmp.path()).await;
523        assert!(matches!(res, Err(RunError::StdinClosed)));
524    }
525
526    /// F-009: oid validation moved to the run-loop boundary. A
527    /// malformed oid on an `upload` event must surface as a
528    /// `complete` event with an empty `oid` field and the raw
529    /// rejected value folded into the `error.message` so the
530    /// operator can correlate the failure with the source event
531    /// line.
532    #[tokio::test]
533    async fn upload_with_invalid_oid_emits_complete_with_empty_oid() {
534        let store = MockStore::new();
535        let resolver = StubResolver {
536            store,
537            prefix: Some("repo".to_owned()),
538        };
539        let tmp = TempDir::new().unwrap();
540        let bad_oid = "not-a-real-oid";
541        let src = tmp.path().join("body");
542        tokio::fs::write(&src, b"x").await.unwrap();
543        let events = vec![
544            r#"{"event":"init","operation":"upload","remote":"origin"}"#.to_owned(),
545            format!(
546                r#"{{"event":"upload","oid":"{bad_oid}","size":1,"path":"{path}"}}"#,
547                path = src.to_str().unwrap(),
548            ),
549            r#"{"event":"terminate"}"#.to_owned(),
550        ];
551        let (lines, res) = drive(&events, &resolver, tmp.path()).await;
552        res.expect("run completes despite bad oid");
553        // init ack + complete (error) for the upload.
554        assert!(
555            lines.len() >= 2,
556            "expected init ack and complete: {lines:?}"
557        );
558        let complete_line = lines
559            .iter()
560            .find(|l| l.contains("\"event\":\"complete\""))
561            .expect("complete event present");
562        // Byte-exact assertion on the wire format. Per the LFS
563        // spec we surface an empty oid field (we never had a
564        // validated id) and place the raw rejected string in the
565        // error message.
566        assert_eq!(
567            complete_line.as_str(),
568            r#"{"event":"complete","oid":"","error":{"code":2,"message":"invalid oid `not-a-real-oid`: LFS oid must be 64 chars, got 14"}}"#,
569        );
570    }
571
572    /// F-009: the same shape for `download`. Validation lives in the
573    /// run loop, the agent is never reached, and the wire-format
574    /// failure event matches the upload case.
575    #[tokio::test]
576    async fn download_with_invalid_oid_emits_complete_with_empty_oid() {
577        let store = MockStore::new();
578        let resolver = StubResolver {
579            store,
580            prefix: Some("repo".to_owned()),
581        };
582        let tmp = TempDir::new().unwrap();
583        let bad_oid = "DEADBEEF";
584        let events = vec![
585            r#"{"event":"init","operation":"download","remote":"origin"}"#.to_owned(),
586            format!(r#"{{"event":"download","oid":"{bad_oid}","size":1}}"#),
587            r#"{"event":"terminate"}"#.to_owned(),
588        ];
589        let (lines, res) = drive(&events, &resolver, tmp.path()).await;
590        res.expect("run completes despite bad oid");
591        let complete_line = lines
592            .iter()
593            .find(|l| l.contains("\"event\":\"complete\""))
594            .expect("complete event present");
595        assert!(
596            complete_line.contains(r#""oid":"""#),
597            "wire-format oid field must be empty for validation failure: {complete_line}"
598        );
599        assert!(
600            complete_line.contains(&format!("invalid oid `{bad_oid}`")),
601            "raw rejected oid must appear in the error message: {complete_line}"
602        );
603        assert!(
604            complete_line.contains(r#""code":2"#),
605            "error code must be the generic value 2: {complete_line}"
606        );
607    }
608
609    /// F-009 sanity check: a valid oid passes the boundary check
610    /// and the agent is reached. This is covered by the existing
611    /// `full_round_trip_init_upload_download_terminate` test; this
612    /// shorter sibling pins the contract more directly.
613    #[tokio::test]
614    async fn upload_with_valid_oid_reaches_agent() {
615        let store = MockStore::new();
616        let resolver = StubResolver {
617            store: store.clone(),
618            prefix: Some("repo".to_owned()),
619        };
620        let tmp = TempDir::new().unwrap();
621        let oid = good_oid();
622        let src = tmp.path().join("body");
623        let body = b"payload";
624        tokio::fs::write(&src, body).await.unwrap();
625        let events = vec![
626            r#"{"event":"init","operation":"upload","remote":"origin"}"#.to_owned(),
627            format!(
628                r#"{{"event":"upload","oid":"{oid}","size":{size},"path":"{path}"}}"#,
629                size = body.len(),
630                path = src.to_str().unwrap(),
631            ),
632            r#"{"event":"terminate"}"#.to_owned(),
633        ];
634        let (lines, res) = drive(&events, &resolver, tmp.path()).await;
635        res.expect("run completes");
636        // The bucket actually received the body — proof the agent
637        // was reached after boundary validation.
638        assert!(store.contains(&format!("repo/lfs/{oid}")));
639        // Wire-side complete has the validated oid (not empty).
640        assert!(
641            lines
642                .iter()
643                .any(|l| l.contains(&format!(r#""oid":"{oid}""#))
644                    && l.contains("\"event\":\"complete\""))
645        );
646    }
647}