gitserver_core/
backend.rs1use std::path::PathBuf;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::task::{Context, Poll};
6
7use tokio::io::AsyncRead;
8use tokio::time::{Duration, Sleep, sleep};
9use tokio_util::io::SyncIoBridge;
10
11use crate::error::Result;
12use crate::pack::UploadPackRequest;
13
14pub const RECEIVE_PACK_TIMEOUT: Duration = Duration::from_secs(300);
15const RECEIVE_PACK_IDLE_TIMEOUT: Duration = Duration::from_secs(30);
16
17struct TimedAsyncRead<R> {
18 inner: R,
19 timeout: Duration,
20 sleep: Option<Pin<Box<Sleep>>>,
21 interrupt: Arc<AtomicBool>,
22}
23
24impl<R> TimedAsyncRead<R> {
25 fn new(inner: R, timeout: Duration, interrupt: Arc<AtomicBool>) -> Self {
26 Self {
27 inner,
28 timeout,
29 sleep: None,
30 interrupt,
31 }
32 }
33}
34
35impl<R> AsyncRead for TimedAsyncRead<R>
36where
37 R: AsyncRead + Unpin,
38{
39 fn poll_read(
40 mut self: Pin<&mut Self>,
41 cx: &mut Context<'_>,
42 buf: &mut tokio::io::ReadBuf<'_>,
43 ) -> Poll<std::io::Result<()>> {
44 if self.sleep.is_none() {
45 self.sleep = Some(Box::pin(sleep(self.timeout)));
46 }
47
48 let before = buf.filled().len();
49 match Pin::new(&mut self.inner).poll_read(cx, buf) {
50 Poll::Ready(Ok(())) => {
51 if buf.filled().len() > before {
52 self.sleep = None;
53 }
54 Poll::Ready(Ok(()))
55 }
56 Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
57 Poll::Pending => {
58 if self
59 .sleep
60 .as_mut()
61 .expect("timeout sleep must exist")
62 .as_mut()
63 .poll(cx)
64 .is_ready()
65 {
66 self.interrupt.store(true, Ordering::Relaxed);
67 Poll::Ready(Err(std::io::Error::new(
68 std::io::ErrorKind::TimedOut,
69 "receive-pack read timed out",
70 )))
71 } else {
72 Poll::Pending
73 }
74 }
75 }
76 }
77}
78
79pub struct GitBackend {
80 repo_path: PathBuf,
81}
82
83impl GitBackend {
84 pub fn new(repo_path: PathBuf) -> Self {
85 Self { repo_path }
86 }
87
88 pub fn advertise_refs(&self) -> Result<Vec<u8>> {
89 crate::refs::advertise_refs(&self.repo_path)
90 }
91
92 pub fn advertise_receive_refs(&self) -> Result<Vec<u8>> {
93 crate::receive_pack::advertise_receive_refs(&self.repo_path)
94 }
95
96 pub async fn upload_pack(&self, request: &UploadPackRequest) -> Result<impl AsyncRead + use<>> {
97 crate::pack::generate_pack(&self.repo_path, request)
98 }
99
100 pub async fn receive_pack<R>(&self, request: R) -> Result<Vec<u8>>
101 where
102 R: AsyncRead + Unpin + Send + 'static,
103 {
104 self.receive_pack_with_timeout(request, RECEIVE_PACK_TIMEOUT)
105 .await
106 }
107
108 async fn receive_pack_with_timeout<R>(
109 &self,
110 request: R,
111 timeout_duration: Duration,
112 ) -> Result<Vec<u8>>
113 where
114 R: AsyncRead + Unpin + Send + 'static,
115 {
116 let repo_path = self.repo_path.clone();
117 let interrupt = Arc::new(AtomicBool::new(false));
118 let watchdog_interrupt = interrupt.clone();
119 let watchdog = tokio::spawn(async move {
120 sleep(timeout_duration).await;
121 watchdog_interrupt.store(true, Ordering::Relaxed);
122 });
123
124 let join = tokio::task::spawn_blocking(move || {
125 let request =
126 TimedAsyncRead::new(request, RECEIVE_PACK_IDLE_TIMEOUT, interrupt.clone());
127 let mut request = SyncIoBridge::new(request);
128 crate::receive_pack::receive_pack_with_interrupt(
129 &repo_path,
130 &mut request,
131 interrupt.as_ref(),
132 )
133 })
134 .await
135 .map_err(|e| crate::error::Error::Protocol(format!("receive-pack task panicked: {e}")));
136
137 watchdog.abort();
138 join?
139 }
140}
141
142#[cfg(test)]
143mod tests {
144 use super::*;
145 use std::process::Command;
146 use tempfile::TempDir;
147
148 fn create_repo_with_commit(root: &std::path::Path) -> PathBuf {
149 let repo_path = root.join("test.git");
150 let work_dir = root.join("work");
151 std::fs::create_dir(&work_dir).unwrap();
152 Command::new("git")
153 .args(["init", "--bare", repo_path.to_str().unwrap()])
154 .output()
155 .unwrap();
156 Command::new("git")
157 .args(["symbolic-ref", "HEAD", "refs/heads/main"])
158 .current_dir(&repo_path)
159 .output()
160 .unwrap();
161 Command::new("git")
162 .args([
163 "clone",
164 repo_path.to_str().unwrap(),
165 work_dir.to_str().unwrap(),
166 ])
167 .output()
168 .unwrap();
169 Command::new("git")
170 .current_dir(&work_dir)
171 .args(["commit", "--allow-empty", "-m", "init"])
172 .env("GIT_AUTHOR_NAME", "Test")
173 .env("GIT_AUTHOR_EMAIL", "t@t.com")
174 .env("GIT_COMMITTER_NAME", "Test")
175 .env("GIT_COMMITTER_EMAIL", "t@t.com")
176 .output()
177 .unwrap();
178 Command::new("git")
179 .current_dir(&work_dir)
180 .args(["push", "origin", "main"])
181 .output()
182 .unwrap();
183 repo_path
184 }
185
186 #[test]
187 fn backend_advertise_refs() {
188 let root = TempDir::new().unwrap();
189 let repo_path = create_repo_with_commit(root.path());
190 let backend = GitBackend::new(repo_path);
191 let output = backend.advertise_refs().unwrap();
192 let output_str = String::from_utf8_lossy(&output);
193 assert!(output_str.contains("refs/heads/main"));
194 }
195
196 #[tokio::test]
197 async fn backend_upload_pack() {
198 let root = TempDir::new().unwrap();
199 let repo_path = create_repo_with_commit(root.path());
200 let repo = gix::open(&repo_path).unwrap();
201 let head = repo.head_id().unwrap();
202
203 let backend = GitBackend::new(repo_path);
204 let request = UploadPackRequest {
205 wants: vec![head.detach()],
206 haves: vec![],
207 done: true,
208 capabilities: Default::default(),
209 shallow: Default::default(),
210 object_ids: None,
211 };
212 let reader = backend.upload_pack(&request).await.unwrap();
213 let mut buf = Vec::new();
214 tokio::io::AsyncReadExt::read_to_end(&mut tokio::io::BufReader::new(reader), &mut buf)
215 .await
216 .unwrap();
217 assert!(buf.windows(4).any(|w| w == b"PACK"));
218 }
219
220 #[tokio::test]
221 async fn backend_receive_pack_times_out_on_stalled_reader() {
222 let root = TempDir::new().unwrap();
223 let repo_path = create_repo_with_commit(root.path());
224 let backend = GitBackend::new(repo_path);
225 let (reader, _writer) = tokio::io::duplex(1);
226
227 let err = backend
228 .receive_pack_with_timeout(reader, Duration::from_millis(50))
229 .await
230 .unwrap_err();
231
232 match err {
233 crate::error::Error::Io(inner) => {
234 assert_eq!(inner.kind(), std::io::ErrorKind::TimedOut);
235 }
236 other => panic!("expected timeout io error, got {other}"),
237 }
238 }
239}