Skip to main content

gitserver_core/
backend.rs

1use std::path::PathBuf;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::task::{Context, Poll};
6
7use tokio::io::AsyncRead;
8use tokio::time::{Duration, Sleep, sleep};
9use tokio_util::io::SyncIoBridge;
10
11use crate::error::Result;
12use crate::pack::UploadPackRequest;
13
14pub const RECEIVE_PACK_TIMEOUT: Duration = Duration::from_secs(300);
15const RECEIVE_PACK_IDLE_TIMEOUT: Duration = Duration::from_secs(30);
16
17struct TimedAsyncRead<R> {
18    inner: R,
19    timeout: Duration,
20    sleep: Option<Pin<Box<Sleep>>>,
21    interrupt: Arc<AtomicBool>,
22}
23
24impl<R> TimedAsyncRead<R> {
25    fn new(inner: R, timeout: Duration, interrupt: Arc<AtomicBool>) -> Self {
26        Self {
27            inner,
28            timeout,
29            sleep: None,
30            interrupt,
31        }
32    }
33}
34
35impl<R> AsyncRead for TimedAsyncRead<R>
36where
37    R: AsyncRead + Unpin,
38{
39    fn poll_read(
40        mut self: Pin<&mut Self>,
41        cx: &mut Context<'_>,
42        buf: &mut tokio::io::ReadBuf<'_>,
43    ) -> Poll<std::io::Result<()>> {
44        if self.sleep.is_none() {
45            self.sleep = Some(Box::pin(sleep(self.timeout)));
46        }
47
48        let before = buf.filled().len();
49        match Pin::new(&mut self.inner).poll_read(cx, buf) {
50            Poll::Ready(Ok(())) => {
51                if buf.filled().len() > before {
52                    self.sleep = None;
53                }
54                Poll::Ready(Ok(()))
55            }
56            Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
57            Poll::Pending => {
58                if self
59                    .sleep
60                    .as_mut()
61                    .expect("timeout sleep must exist")
62                    .as_mut()
63                    .poll(cx)
64                    .is_ready()
65                {
66                    self.interrupt.store(true, Ordering::Relaxed);
67                    Poll::Ready(Err(std::io::Error::new(
68                        std::io::ErrorKind::TimedOut,
69                        "receive-pack read timed out",
70                    )))
71                } else {
72                    Poll::Pending
73                }
74            }
75        }
76    }
77}
78
79pub struct GitBackend {
80    repo_path: PathBuf,
81}
82
83impl GitBackend {
84    pub fn new(repo_path: PathBuf) -> Self {
85        Self { repo_path }
86    }
87
88    pub fn advertise_refs(&self) -> Result<Vec<u8>> {
89        crate::refs::advertise_refs(&self.repo_path)
90    }
91
92    pub fn advertise_receive_refs(&self) -> Result<Vec<u8>> {
93        crate::receive_pack::advertise_receive_refs(&self.repo_path)
94    }
95
96    pub async fn upload_pack(&self, request: &UploadPackRequest) -> Result<impl AsyncRead + use<>> {
97        crate::pack::generate_pack(&self.repo_path, request)
98    }
99
100    pub async fn receive_pack<R>(&self, request: R) -> Result<Vec<u8>>
101    where
102        R: AsyncRead + Unpin + Send + 'static,
103    {
104        self.receive_pack_with_timeout(request, RECEIVE_PACK_TIMEOUT)
105            .await
106    }
107
108    async fn receive_pack_with_timeout<R>(
109        &self,
110        request: R,
111        timeout_duration: Duration,
112    ) -> Result<Vec<u8>>
113    where
114        R: AsyncRead + Unpin + Send + 'static,
115    {
116        let repo_path = self.repo_path.clone();
117        let interrupt = Arc::new(AtomicBool::new(false));
118        let watchdog_interrupt = interrupt.clone();
119        let watchdog = tokio::spawn(async move {
120            sleep(timeout_duration).await;
121            watchdog_interrupt.store(true, Ordering::Relaxed);
122        });
123
124        let join = tokio::task::spawn_blocking(move || {
125            let request =
126                TimedAsyncRead::new(request, RECEIVE_PACK_IDLE_TIMEOUT, interrupt.clone());
127            let mut request = SyncIoBridge::new(request);
128            crate::receive_pack::receive_pack_with_interrupt(
129                &repo_path,
130                &mut request,
131                interrupt.as_ref(),
132            )
133        })
134        .await
135        .map_err(|e| crate::error::Error::Protocol(format!("receive-pack task panicked: {e}")));
136
137        watchdog.abort();
138        join?
139    }
140}
141
142#[cfg(test)]
143mod tests {
144    use super::*;
145    use std::process::Command;
146    use tempfile::TempDir;
147
148    fn create_repo_with_commit(root: &std::path::Path) -> PathBuf {
149        let repo_path = root.join("test.git");
150        let work_dir = root.join("work");
151        std::fs::create_dir(&work_dir).unwrap();
152        Command::new("git")
153            .args(["init", "--bare", repo_path.to_str().unwrap()])
154            .output()
155            .unwrap();
156        Command::new("git")
157            .args(["symbolic-ref", "HEAD", "refs/heads/main"])
158            .current_dir(&repo_path)
159            .output()
160            .unwrap();
161        Command::new("git")
162            .args([
163                "clone",
164                repo_path.to_str().unwrap(),
165                work_dir.to_str().unwrap(),
166            ])
167            .output()
168            .unwrap();
169        Command::new("git")
170            .current_dir(&work_dir)
171            .args(["commit", "--allow-empty", "-m", "init"])
172            .env("GIT_AUTHOR_NAME", "Test")
173            .env("GIT_AUTHOR_EMAIL", "t@t.com")
174            .env("GIT_COMMITTER_NAME", "Test")
175            .env("GIT_COMMITTER_EMAIL", "t@t.com")
176            .output()
177            .unwrap();
178        Command::new("git")
179            .current_dir(&work_dir)
180            .args(["push", "origin", "main"])
181            .output()
182            .unwrap();
183        repo_path
184    }
185
186    #[test]
187    fn backend_advertise_refs() {
188        let root = TempDir::new().unwrap();
189        let repo_path = create_repo_with_commit(root.path());
190        let backend = GitBackend::new(repo_path);
191        let output = backend.advertise_refs().unwrap();
192        let output_str = String::from_utf8_lossy(&output);
193        assert!(output_str.contains("refs/heads/main"));
194    }
195
196    #[tokio::test]
197    async fn backend_upload_pack() {
198        let root = TempDir::new().unwrap();
199        let repo_path = create_repo_with_commit(root.path());
200        let repo = gix::open(&repo_path).unwrap();
201        let head = repo.head_id().unwrap();
202
203        let backend = GitBackend::new(repo_path);
204        let request = UploadPackRequest {
205            wants: vec![head.detach()],
206            haves: vec![],
207            done: true,
208            capabilities: Default::default(),
209            shallow: Default::default(),
210            object_ids: None,
211        };
212        let reader = backend.upload_pack(&request).await.unwrap();
213        let mut buf = Vec::new();
214        tokio::io::AsyncReadExt::read_to_end(&mut tokio::io::BufReader::new(reader), &mut buf)
215            .await
216            .unwrap();
217        assert!(buf.windows(4).any(|w| w == b"PACK"));
218    }
219
220    #[tokio::test]
221    async fn backend_receive_pack_times_out_on_stalled_reader() {
222        let root = TempDir::new().unwrap();
223        let repo_path = create_repo_with_commit(root.path());
224        let backend = GitBackend::new(repo_path);
225        let (reader, _writer) = tokio::io::duplex(1);
226
227        let err = backend
228            .receive_pack_with_timeout(reader, Duration::from_millis(50))
229            .await
230            .unwrap_err();
231
232        match err {
233            crate::error::Error::Io(inner) => {
234                assert_eq!(inner.kind(), std::io::ErrorKind::TimedOut);
235            }
236            other => panic!("expected timeout io error, got {other}"),
237        }
238    }
239}