1use std::path::{Path, PathBuf};
11use std::str::FromStr;
12use std::sync::Arc;
13
14use thiserror::Error;
15use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncWrite};
16use tracing::{debug, error, warn};
17
18use crate::lfs::agent::{self, Agent, AgentError, ERR_CODE_GENERIC, ERR_CODE_INIT};
19use crate::lfs::oid::LfsOid;
20use crate::lfs::protocol::{CompleteEvent, ErrorPayload, Event, InitEvent, InitResponse};
21use crate::object_store::ObjectStore;
22use crate::protocol::backend;
23use crate::url;
24
25#[derive(Debug, Error)]
31pub enum RunError {
32 #[error("LFS protocol I/O error: {0}")]
34 Io(#[from] std::io::Error),
35 #[error(transparent)]
37 Agent(#[from] AgentError),
38 #[error("malformed LFS event: {0}")]
42 MalformedEvent(#[from] serde_json::Error),
43 #[error("expected init as the first event, got {0}")]
47 InitNotFirst(String),
48 #[error("stdin closed before init")]
50 StdinClosed,
51}
52
53impl RunError {
54 #[must_use]
60 pub fn is_broken_pipe(&self) -> bool {
61 let io_err = match self {
62 Self::Io(e) | Self::Agent(AgentError::Io(e)) => Some(e),
63 _ => None,
64 };
65 io_err.is_some_and(|e| {
66 matches!(
67 e.kind(),
68 std::io::ErrorKind::BrokenPipe | std::io::ErrorKind::WriteZero,
69 )
70 })
71 }
72}
73
74#[derive(Debug, Error)]
79enum InitError {
80 #[error("init.remote is empty")]
83 EmptyRemote,
84 #[error("cannot resolve remote \"{remote}\": {source}")]
87 Resolve {
88 remote: String,
90 #[source]
92 source: Box<dyn std::error::Error + Send + Sync>,
93 },
94}
95
96#[async_trait::async_trait]
100pub trait RemoteResolver: Send + Sync {
101 async fn resolve(
103 &self,
104 remote_name: &str,
105 ) -> Result<(Arc<dyn ObjectStore>, Option<String>), Box<dyn std::error::Error + Send + Sync>>;
106}
107
108pub struct GitRemoteResolver {
112 pub repo_dir: PathBuf,
115}
116
117#[async_trait::async_trait]
118impl RemoteResolver for GitRemoteResolver {
119 async fn resolve(
120 &self,
121 remote_name: &str,
122 ) -> Result<(Arc<dyn ObjectStore>, Option<String>), Box<dyn std::error::Error + Send + Sync>>
123 {
124 let repo = gix::discover(&self.repo_dir)?;
128 let raw = crate::git::remote_url(&repo, remote_name)?;
129 let parsed = url::parse(&raw)?;
130 let prefix = parsed.prefix().map(str::to_owned);
131 let (store, _engine) = backend::build(&parsed).await?;
135 Ok((store, prefix))
136 }
137}
138
139pub async fn run<R, W, Res>(
152 reader: R,
153 mut writer: W,
154 resolver: &Res,
155 tmp_dir: &Path,
156) -> Result<(), RunError>
157where
158 R: AsyncBufRead + Unpin,
159 W: AsyncWrite + Unpin,
160 Res: RemoteResolver + ?Sized,
161{
162 let mut lines = reader.lines();
163
164 let Some(first) = lines.next_line().await? else {
165 return Err(RunError::StdinClosed);
166 };
167 let event = parse_event(&first)?;
168 let init = match event {
169 Event::Init(init) => init,
170 Event::Terminate => {
171 debug!("received terminate before init; exiting");
174 return Ok(());
175 }
176 other => {
177 return Err(RunError::InitNotFirst(format!("{other:?}")));
178 }
179 };
180
181 let agent = match init_agent(&init, resolver, tmp_dir.to_owned()).await {
182 Ok(a) => {
183 write_init_ack(&mut writer, None).await?;
184 a
185 }
186 Err(err) => {
187 error!(error = %err, "init failed");
188 write_init_ack(&mut writer, Some(&err.to_string())).await?;
189 return Ok(());
190 }
191 };
192
193 while let Some(line) = lines.next_line().await? {
194 debug!(line = %line, "lfs event");
195 let event = parse_event(&line)?;
196 match event {
197 Event::Init(_) => {
198 warn!("received second init; ignoring");
199 }
200 Event::Upload(u) => {
201 if let Some(oid) = validate_oid(&u.oid, &mut writer, "upload").await? {
202 agent
203 .upload(&oid, u.size, Path::new(&u.path), &mut writer)
204 .await?;
205 }
206 }
207 Event::Download(d) => {
208 if let Some(oid) = validate_oid(&d.oid, &mut writer, "download").await? {
209 agent.download(&oid, d.size, &mut writer).await?;
210 }
211 }
212 Event::Terminate => {
213 debug!("received terminate; exiting");
214 break;
215 }
216 }
217 }
218 Ok(())
219}
220
221fn parse_event(line: &str) -> Result<Event, RunError> {
222 Ok(serde_json::from_str(line)?)
226}
227
228async fn init_agent<Res>(
229 init: &InitEvent,
230 resolver: &Res,
231 tmp_dir: PathBuf,
232) -> Result<Agent, InitError>
233where
234 Res: RemoteResolver + ?Sized,
235{
236 if init.remote.is_empty() {
237 return Err(InitError::EmptyRemote);
238 }
239 let (store, prefix) =
240 resolver
241 .resolve(&init.remote)
242 .await
243 .map_err(|source| InitError::Resolve {
244 remote: init.remote.clone(),
245 source,
246 })?;
247 Ok(Agent::new(store, prefix, tmp_dir))
248}
249
250async fn validate_oid<W: AsyncWrite + Unpin>(
257 oid_raw: &str,
258 writer: &mut W,
259 op: &'static str,
260) -> Result<Option<LfsOid>, RunError> {
261 match LfsOid::from_str(oid_raw) {
262 Ok(oid) => Ok(Some(oid)),
263 Err(err) => {
264 warn!(oid = %oid_raw, error = %err, op, "rejecting malformed oid");
265 let message = format!("invalid oid `{oid_raw}`: {err}");
266 let evt = CompleteEvent {
267 event: "complete",
268 oid: "",
269 path: None,
270 error: Some(ErrorPayload {
271 code: ERR_CODE_GENERIC,
272 message: &message,
273 }),
274 };
275 agent::write_event(writer, &evt).await?;
276 Ok(None)
277 }
278 }
279}
280
281async fn write_init_ack<W: AsyncWrite + Unpin>(
282 writer: &mut W,
283 error_msg: Option<&str>,
284) -> Result<(), RunError> {
285 let resp = InitResponse {
286 error: error_msg.map(|m| ErrorPayload {
287 code: ERR_CODE_INIT,
288 message: m,
289 }),
290 };
291 Ok(agent::write_event(writer, &resp).await?)
292}
293
294#[cfg(test)]
295mod tests {
296 use super::*;
297 use crate::object_store::mock::MockStore;
298 use bytes::Bytes;
299 use tempfile::TempDir;
300
301 struct StubResolver {
302 store: MockStore,
303 prefix: Option<String>,
304 }
305
306 #[async_trait::async_trait]
307 impl RemoteResolver for StubResolver {
308 async fn resolve(
309 &self,
310 _remote_name: &str,
311 ) -> Result<(Arc<dyn ObjectStore>, Option<String>), Box<dyn std::error::Error + Send + Sync>>
312 {
313 Ok((Arc::new(self.store.clone()), self.prefix.clone()))
314 }
315 }
316
317 fn good_oid() -> String {
318 "fedcba9876543210fedcba9876543210fedcba9876543210fedcba9876543210".to_owned()
319 }
320
321 async fn drive(
322 events: &[String],
323 resolver: &dyn RemoteResolver,
324 tmp_dir: &Path,
325 ) -> (Vec<String>, Result<(), RunError>) {
326 let mut input = events.join("\n");
327 if !events.is_empty() {
328 input.push('\n');
329 }
330 let reader = tokio::io::BufReader::new(std::io::Cursor::new(input.into_bytes()));
331 let mut output: Vec<u8> = Vec::new();
332 let res = run(reader, &mut output, resolver, tmp_dir).await;
333 let lines = String::from_utf8(output)
334 .unwrap()
335 .lines()
336 .map(str::to_owned)
337 .collect();
338 (lines, res)
339 }
340
341 #[tokio::test]
342 async fn full_round_trip_init_upload_download_terminate() {
343 let store = MockStore::new();
344 let oid = good_oid();
345 let body = b"some body";
346 let oid2 = good_oid();
348 store.insert(format!("repo/lfs/{oid2}"), Bytes::from_static(body));
349
350 let resolver = StubResolver {
351 store: store.clone(),
352 prefix: Some("repo".to_owned()),
353 };
354
355 let tmp = TempDir::new().unwrap();
356 let src = tmp.path().join("src");
357 tokio::fs::write(&src, body).await.unwrap();
358
359 let events = vec![
360 r#"{"event":"init","operation":"upload","remote":"origin"}"#.to_owned(),
361 format!(
362 r#"{{"event":"upload","oid":"{oid}","size":{size},"path":"{path}"}}"#,
363 size = body.len(),
364 path = src.to_str().unwrap(),
365 ),
366 format!(
367 r#"{{"event":"download","oid":"{oid2}","size":{size}}}"#,
368 size = body.len(),
369 ),
370 r#"{"event":"terminate"}"#.to_owned(),
371 ];
372 let (lines, res) = drive(&events, &resolver, tmp.path()).await;
373 res.expect("run should exit cleanly");
374
375 assert_eq!(lines[0], "{}", "init ack should be empty object");
377 assert!(lines.iter().any(|l| l.contains("\"event\":\"progress\"")));
378 let completes: Vec<_> = lines
379 .iter()
380 .filter(|l| l.contains("\"event\":\"complete\""))
381 .collect();
382 assert_eq!(completes.len(), 2, "expected two completes: {lines:?}");
383 assert!(store.contains(&format!("repo/lfs/{oid}")));
384 }
385
386 #[tokio::test]
387 async fn init_failure_emits_error_object_and_exits_cleanly() {
388 struct FailingResolver;
389 #[async_trait::async_trait]
390 impl RemoteResolver for FailingResolver {
391 async fn resolve(
392 &self,
393 _remote_name: &str,
394 ) -> Result<
395 (Arc<dyn ObjectStore>, Option<String>),
396 Box<dyn std::error::Error + Send + Sync>,
397 > {
398 Err("no such remote".into())
399 }
400 }
401 let tmp = TempDir::new().unwrap();
402 let events = vec![r#"{"event":"init","remote":"origin"}"#.to_owned()];
403 let (lines, res) = drive(&events, &FailingResolver, tmp.path()).await;
404 res.expect("init failure is non-fatal");
405 assert_eq!(lines.len(), 1);
406 assert!(lines[0].contains("\"error\""));
407 assert!(lines[0].contains(&format!("\"code\":{ERR_CODE_INIT}")));
408 }
409
410 #[tokio::test]
411 async fn first_non_init_event_is_fatal() {
412 let store = MockStore::new();
413 let resolver = StubResolver {
414 store,
415 prefix: Some("repo".into()),
416 };
417 let tmp = TempDir::new().unwrap();
418 let events = vec![r#"{"event":"upload","oid":"abc","size":1,"path":"/tmp/x"}"#.to_owned()];
419 let (_, res) = drive(&events, &resolver, tmp.path()).await;
420 let err = res.expect_err("non-init first event must error");
421 assert!(matches!(err, RunError::InitNotFirst(_)));
422 }
423
424 #[test]
425 fn init_not_first_display_does_not_double_quote_payload() {
426 let err = RunError::InitNotFirst("Upload(UploadEvent { oid: \"abc\" })".to_owned());
431 let rendered = err.to_string();
432 assert!(
433 rendered.starts_with("expected init as the first event, got Upload(UploadEvent {"),
434 "InitNotFirst should not wrap the payload in extra quotes: {rendered}"
435 );
436 }
437
438 #[tokio::test]
439 async fn empty_remote_in_init_emits_error_object_and_exits_cleanly() {
440 struct UnreachableResolver;
444 #[async_trait::async_trait]
445 impl RemoteResolver for UnreachableResolver {
446 async fn resolve(
447 &self,
448 _remote_name: &str,
449 ) -> Result<
450 (Arc<dyn ObjectStore>, Option<String>),
451 Box<dyn std::error::Error + Send + Sync>,
452 > {
453 panic!("resolver should not be called when init.remote is empty");
454 }
455 }
456 let tmp = TempDir::new().unwrap();
457 let events = vec![r#"{"event":"init","remote":""}"#.to_owned()];
458 let (lines, res) = drive(&events, &UnreachableResolver, tmp.path()).await;
459 res.expect("empty-remote init failure is non-fatal");
460 assert_eq!(lines.len(), 1);
461 assert!(lines[0].contains("\"error\""));
462 assert!(lines[0].contains(&format!("\"code\":{ERR_CODE_INIT}")));
463 assert!(
464 lines[0].contains("init.remote is empty"),
465 "ack should include the InitError::EmptyRemote message: {}",
466 lines[0]
467 );
468 }
469
470 #[tokio::test]
471 async fn broken_pipe_during_init_ack_is_clean_exit() {
472 use tokio::io::duplex;
477
478 let (writer, reader) = duplex(64);
481 drop(reader); let store = MockStore::new();
484 let resolver = StubResolver {
485 store,
486 prefix: None,
487 };
488 let tmp = TempDir::new().unwrap();
489 let input = r#"{"event":"init","remote":"origin"}"#;
490 let buffered = tokio::io::BufReader::new(std::io::Cursor::new(input.as_bytes().to_vec()));
491
492 let res = run(buffered, writer, &resolver, tmp.path()).await;
493 let err = res.expect_err("write to closed duplex must surface as Err");
494 assert!(
495 err.is_broken_pipe(),
496 "init-ack BrokenPipe must be classified as broken-pipe, got: {err:?}"
497 );
498 }
499
500 #[tokio::test]
501 async fn malformed_json_is_fatal() {
502 let store = MockStore::new();
503 let resolver = StubResolver {
504 store,
505 prefix: None,
506 };
507 let tmp = TempDir::new().unwrap();
508 let events = vec!["not json".to_owned()];
509 let (_, res) = drive(&events, &resolver, tmp.path()).await;
510 let err = res.expect_err("garbage line must error");
511 assert!(matches!(err, RunError::MalformedEvent(_)));
512 }
513
514 #[tokio::test]
515 async fn empty_stdin_returns_stdin_closed() {
516 let store = MockStore::new();
517 let resolver = StubResolver {
518 store,
519 prefix: None,
520 };
521 let tmp = TempDir::new().unwrap();
522 let (_, res) = drive(&[], &resolver, tmp.path()).await;
523 assert!(matches!(res, Err(RunError::StdinClosed)));
524 }
525
526 #[tokio::test]
533 async fn upload_with_invalid_oid_emits_complete_with_empty_oid() {
534 let store = MockStore::new();
535 let resolver = StubResolver {
536 store,
537 prefix: Some("repo".to_owned()),
538 };
539 let tmp = TempDir::new().unwrap();
540 let bad_oid = "not-a-real-oid";
541 let src = tmp.path().join("body");
542 tokio::fs::write(&src, b"x").await.unwrap();
543 let events = vec![
544 r#"{"event":"init","operation":"upload","remote":"origin"}"#.to_owned(),
545 format!(
546 r#"{{"event":"upload","oid":"{bad_oid}","size":1,"path":"{path}"}}"#,
547 path = src.to_str().unwrap(),
548 ),
549 r#"{"event":"terminate"}"#.to_owned(),
550 ];
551 let (lines, res) = drive(&events, &resolver, tmp.path()).await;
552 res.expect("run completes despite bad oid");
553 assert!(
555 lines.len() >= 2,
556 "expected init ack and complete: {lines:?}"
557 );
558 let complete_line = lines
559 .iter()
560 .find(|l| l.contains("\"event\":\"complete\""))
561 .expect("complete event present");
562 assert_eq!(
567 complete_line.as_str(),
568 r#"{"event":"complete","oid":"","error":{"code":2,"message":"invalid oid `not-a-real-oid`: LFS oid must be 64 chars, got 14"}}"#,
569 );
570 }
571
572 #[tokio::test]
576 async fn download_with_invalid_oid_emits_complete_with_empty_oid() {
577 let store = MockStore::new();
578 let resolver = StubResolver {
579 store,
580 prefix: Some("repo".to_owned()),
581 };
582 let tmp = TempDir::new().unwrap();
583 let bad_oid = "DEADBEEF";
584 let events = vec![
585 r#"{"event":"init","operation":"download","remote":"origin"}"#.to_owned(),
586 format!(r#"{{"event":"download","oid":"{bad_oid}","size":1}}"#),
587 r#"{"event":"terminate"}"#.to_owned(),
588 ];
589 let (lines, res) = drive(&events, &resolver, tmp.path()).await;
590 res.expect("run completes despite bad oid");
591 let complete_line = lines
592 .iter()
593 .find(|l| l.contains("\"event\":\"complete\""))
594 .expect("complete event present");
595 assert!(
596 complete_line.contains(r#""oid":"""#),
597 "wire-format oid field must be empty for validation failure: {complete_line}"
598 );
599 assert!(
600 complete_line.contains(&format!("invalid oid `{bad_oid}`")),
601 "raw rejected oid must appear in the error message: {complete_line}"
602 );
603 assert!(
604 complete_line.contains(r#""code":2"#),
605 "error code must be the generic value 2: {complete_line}"
606 );
607 }
608
609 #[tokio::test]
614 async fn upload_with_valid_oid_reaches_agent() {
615 let store = MockStore::new();
616 let resolver = StubResolver {
617 store: store.clone(),
618 prefix: Some("repo".to_owned()),
619 };
620 let tmp = TempDir::new().unwrap();
621 let oid = good_oid();
622 let src = tmp.path().join("body");
623 let body = b"payload";
624 tokio::fs::write(&src, body).await.unwrap();
625 let events = vec![
626 r#"{"event":"init","operation":"upload","remote":"origin"}"#.to_owned(),
627 format!(
628 r#"{{"event":"upload","oid":"{oid}","size":{size},"path":"{path}"}}"#,
629 size = body.len(),
630 path = src.to_str().unwrap(),
631 ),
632 r#"{"event":"terminate"}"#.to_owned(),
633 ];
634 let (lines, res) = drive(&events, &resolver, tmp.path()).await;
635 res.expect("run completes");
636 assert!(store.contains(&format!("repo/lfs/{oid}")));
639 assert!(
641 lines
642 .iter()
643 .any(|l| l.contains(&format!(r#""oid":"{oid}""#))
644 && l.contains("\"event\":\"complete\""))
645 );
646 }
647}