pkgcruft_git/
service.rs

1use std::io::Write;
2use std::net::SocketAddr;
3use std::sync::Arc;
4
5use camino::Utf8PathBuf;
6use futures::FutureExt;
7use indexmap::IndexSet;
8use itertools::Itertools;
9use pkgcraft::config::Config as PkgcraftConfig;
10use pkgcraft::restrict::Restrict;
11use pkgcruft::report::ReportLevel;
12use pkgcruft::scan::Scanner;
13use tempfile::{TempDir, tempdir};
14use tokio::net::{TcpListener, UnixListener};
15use tokio::sync::{Semaphore, mpsc, oneshot};
16use tokio::task::JoinHandle;
17use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream, UnixListenerStream};
18use tonic::transport::Server;
19use tonic::{Request, Response, Status};
20
21use crate::proto::pkgcruft_server::{Pkgcruft, PkgcruftServer};
22use crate::proto::{EmptyRequest, PushRequest, PushResponse, StringResponse};
23use crate::uds::verify_socket_path;
24use crate::{Error, git};
25
26enum Listener {
27    Tcp(TcpListener),
28    Unix(UnixListener),
29}
30
31impl Listener {
32    /// Try creating a new listener for the pkgcruft service.
33    async fn try_new<S: AsRef<str>>(socket: S) -> crate::Result<(String, Self)> {
34        let socket = socket.as_ref();
35        let (socket, listener) = match socket.parse::<SocketAddr>() {
36            Err(_) if socket.starts_with('/') => {
37                verify_socket_path(socket)?;
38                let listener = UnixListener::bind(socket).map_err(|e| {
39                    Error::Start(format!("failed binding to socket: {socket}: {e}"))
40                })?;
41                (socket.to_string(), Listener::Unix(listener))
42            }
43            Err(_) => return Err(Error::InvalidValue(format!("invalid socket: {socket}"))),
44            Ok(socket) => {
45                let listener = TcpListener::bind(&socket).await.map_err(|e| {
46                    Error::Start(format!("failed binding to socket: {socket}: {e}"))
47                })?;
48                let addr = listener.local_addr().map_err(|e| {
49                    Error::Start(format!("invalid local address: {socket}: {e}"))
50                })?;
51                (addr.to_string(), Listener::Tcp(listener))
52            }
53        };
54
55        Ok((socket, listener))
56    }
57}
58
59pub struct PkgcruftServiceBuilder {
60    uri: String,
61    socket: Option<String>,
62    jobs: usize,
63    temp: bool,
64}
65
66impl PkgcruftServiceBuilder {
67    /// Create a new service builder.
68    pub fn new<S: ToString>(uri: S) -> Self {
69        Self {
70            uri: uri.to_string(),
71            socket: None,
72            jobs: num_cpus::get(),
73            temp: false,
74        }
75    }
76
77    /// Set the network socket to bind.
78    pub fn socket<S: Into<String>>(mut self, socket: S) -> Self {
79        self.socket = Some(socket.into());
80        self
81    }
82
83    /// Set the number of jobs to run.
84    pub fn jobs(mut self, value: usize) -> Self {
85        self.jobs = value;
86        self
87    }
88
89    /// Use a temporary directory for the git repo.
90    pub fn temp(mut self, value: bool) -> Self {
91        self.temp = value;
92        self
93    }
94
95    /// Create a network listener for the service.
96    async fn create_listener(&self) -> crate::Result<(String, Listener)> {
97        // determine network socket
98        let socket = if let Some(value) = &self.socket {
99            value.to_string()
100        } else {
101            // default to using UNIX domain socket for the executing user
102            let config = PkgcraftConfig::new("pkgcraft", "");
103            config.path().run.join("pkgcruft-gitd.sock").to_string()
104        };
105        Listener::try_new(&socket).await
106    }
107
108    /// Start the service listening on the given Listener.
109    async fn listen(self, listener: Listener) -> crate::Result<Pkgcruftd> {
110        let service = PkgcruftService::try_new(&self.uri, self.temp, self.jobs)?;
111        let server = Server::builder().add_service(PkgcruftServer::new(service));
112
113        let (tx, rx) = oneshot::channel::<()>();
114        match listener {
115            Listener::Unix(listener) => {
116                server
117                    .serve_with_incoming_shutdown(
118                        UnixListenerStream::new(listener),
119                        rx.map(drop),
120                    )
121                    .await
122            }
123            Listener::Tcp(listener) => {
124                server
125                    .serve_with_incoming_shutdown(
126                        TcpListenerStream::new(listener),
127                        rx.map(drop),
128                    )
129                    .await
130            }
131        }
132        .map_err(|e| Error::Service(e.to_string()))?;
133
134        Ok(Pkgcruftd { _tx: tx })
135    }
136
137    /// Start the service.
138    pub async fn start(self) -> crate::Result<Pkgcruftd> {
139        let (socket, listener) = self.create_listener().await?;
140        tracing::info!("service listening at: {socket}");
141        self.listen(listener).await
142    }
143
144    /// Spawn the service in a tokio task.
145    pub async fn spawn(self) -> crate::Result<PkgcruftdTask> {
146        let (socket, listener) = self.create_listener().await?;
147        let _service = tokio::spawn(async move { self.listen(listener).await });
148        Ok(PkgcruftdTask { socket, _service })
149    }
150}
151
152/// Pkgcruft service spawned into a tokio task providing socket access for tests.
153#[derive(Debug)]
154pub struct PkgcruftdTask {
155    pub socket: String,
156    _service: JoinHandle<crate::Result<Pkgcruftd>>,
157}
158
159/// Pkgcruft service wrapper that forces the service to end when dropped.
160#[derive(Debug)]
161pub struct Pkgcruftd {
162    _tx: oneshot::Sender<()>,
163}
164
165struct PkgcruftService {
166    _tempdir: Option<TempDir>,
167    path: Utf8PathBuf,
168    scanning: Arc<Semaphore>,
169    jobs: usize,
170}
171
172impl PkgcruftService {
173    /// Try creating a new service.
174    fn try_new(uri: &str, temp: bool, jobs: usize) -> crate::Result<Self> {
175        let mut _tempdir = None;
176        let path = if temp {
177            // create temporary git repo dir
178            let tempdir = tempdir()
179                .map_err(|e| Error::Start(format!("failed creating temp dir: {e}")))?;
180            let path = Utf8PathBuf::from_path_buf(tempdir.path().to_owned())
181                .map_err(|p| Error::Start(format!("invalid tempdir path: {p:?}")))?;
182            _tempdir = Some(tempdir);
183
184            // clone git repo into temporary dir
185            git::clone(uri, &path)
186                .map_err(|e| Error::Start(format!("failed cloning git repo: {uri}: {e}")))?;
187
188            path
189        } else {
190            uri.into()
191        };
192
193        // verify target path is a valid ebuild repo
194        let mut config = PkgcraftConfig::new("pkgcraft", "");
195        let repo = config
196            .add_repo_path("repo", &path, 0)
197            .map_err(|e| Error::Start(format!("invalid repo: {e}")))?;
198        let repo = repo
199            .into_ebuild()
200            .map_err(|e| Error::Start(format!("invalid ebuild repo: {path}: {e}")))?;
201        config
202            .finalize()
203            .map_err(|e| Error::Start(format!("failed finalizing config: {e}")))?;
204
205        // generate ebuild repo metadata ignoring failures
206        repo.metadata()
207            .cache()
208            .regen(&repo)
209            .progress(true)
210            .run()
211            .ok();
212
213        // TODO: generate or verify db of existing pkgcruft reports
214
215        Ok(Self {
216            _tempdir,
217            path,
218            scanning: Arc::new(Semaphore::new(1)),
219            jobs,
220        })
221    }
222
223    /// Perform a scanning run for a push request.
224    fn handle_push(
225        &self,
226        git_repo: &git2::Repository,
227        push: &PushRequest,
228    ) -> crate::Result<PushResponse> {
229        // write pack file to odb
230        let odb = git_repo.odb()?;
231        let mut pack_writer = odb.packwriter()?;
232        pack_writer
233            .write_all(&push.pack)
234            .map_err(|e| Error::IO(format!("failed writing pack file: {e}")))?;
235        pack_writer
236            .flush()
237            .map_err(|e| Error::IO(format!("failed flushing pack file: {e}")))?;
238        pack_writer.commit()?;
239
240        // determine target commit
241        let ref_name = &push.ref_name;
242        let old_oid: git2::Oid = push.old_ref.parse()?;
243        let new_oid: git2::Oid = push.new_ref.parse()?;
244        let commit = git_repo.find_annotated_commit(new_oid)?;
245
246        // update target reference for unborn or fast-forward merge variants
247        let (analysis, _prefs) = git_repo.merge_analysis(&[&commit])?;
248        if analysis.is_unborn() {
249            let msg = format!("unborn: setting {ref_name}: {new_oid}");
250            git_repo.reference("HEAD", new_oid, false, &msg)?;
251        } else if analysis.is_fast_forward() {
252            // verify HEAD points to the expected commit
253            let head = git_repo.head()?;
254            let head_oid = head.peel_to_commit()?.id();
255            if head_oid != old_oid {
256                return Err(Error::InvalidValue(format!("invalid git repo HEAD: {head_oid}")));
257            }
258
259            // update target reference
260            let msg = format!("fast-forward: setting {ref_name}: {new_oid}");
261            git_repo
262                .find_reference(ref_name)?
263                .set_target(new_oid, &msg)?;
264        } else {
265            return Err(Error::InvalidValue(format!("non-fast-forward merge: {analysis:?}")));
266        }
267
268        // update HEAD for target reference
269        git_repo.set_head(ref_name)?;
270        git_repo.checkout_head(Some(git2::build::CheckoutBuilder::default().force()))?;
271
272        // determine diff
273        let diff = git::diff(git_repo, &push.old_ref, &push.new_ref)?;
274
275        // initialize ebuild repo
276        let mut config = PkgcraftConfig::new("pkgcraft", "");
277        let repo = config.add_repo_path("repo", &self.path, 0)?;
278        let repo = repo
279            .into_ebuild()
280            .map_err(|e| Error::InvalidValue(format!("invalid ebuild repo: {e}")))?;
281        config.finalize()?;
282
283        // determine target Cpns from diff
284        let mut cpns = IndexSet::new();
285        let mut eclass = false;
286        for delta in diff.deltas() {
287            if let Some(path) = delta.new_file().path() {
288                if let Ok(cpn) = repo.cpn_from_path(path) {
289                    cpns.insert(cpn);
290                } else if path.starts_with("eclass") {
291                    eclass = true;
292                }
293            }
294        }
295
296        let mut reports = IndexSet::new();
297
298        // scan individual packages that were changed
299        let mut scanner = Scanner::new()
300            .jobs(self.jobs)
301            .exit([ReportLevel::Critical, ReportLevel::Error]);
302        for cpn in cpns {
303            let reports_iter = scanner.run(&repo, &cpn)?;
304            reports.extend(reports_iter.into_iter().map(|r| r.to_json()));
305        }
306
307        // scan full tree for metadata errors on eclass changes
308        if eclass {
309            scanner = scanner.reports([pkgcruft::check::CheckKind::Metadata]);
310            let reports_iter = scanner.run(&repo, Restrict::True)?;
311            reports.extend(reports_iter.into_iter().map(|r| r.to_json()));
312        }
313
314        Ok(PushResponse {
315            reports: reports.into_iter().sorted().collect(),
316            failed: scanner.failed(),
317        })
318    }
319}
320
321#[tonic::async_trait]
322impl Pkgcruft for PkgcruftService {
323    async fn version(
324        &self,
325        _request: Request<EmptyRequest>,
326    ) -> Result<Response<StringResponse>, Status> {
327        let data = env!("CARGO_PKG_VERSION").to_string();
328        let reply = StringResponse { data };
329        Ok(Response::new(reply))
330    }
331
332    type ScanStream = ReceiverStream<Result<StringResponse, Status>>;
333
334    async fn scan(
335        &self,
336        _request: Request<EmptyRequest>,
337    ) -> Result<Response<Self::ScanStream>, Status> {
338        // TODO: use try_acquire_owned() with custom timeout
339        // acquire exclusive scanning permission
340        let permit = self.scanning.clone().acquire_owned().await.unwrap();
341
342        // TODO: partially reload repo or reset lazy metadata fields
343        let mut config = PkgcraftConfig::new("pkgcraft", "");
344        let repo = config
345            .add_repo_path("repo", &self.path, 0)
346            .map_err(|e| Status::from_error(Box::new(e)))?;
347        let repo = repo
348            .into_ebuild()
349            .map_err(|e| Status::invalid_argument(format!("invalid ebuild repo: {e}")))?;
350        config
351            .finalize()
352            .map_err(|e| Status::from_error(Box::new(e)))?;
353
354        // TODO: process request data into a restrict target
355        let scanner = Scanner::new().jobs(self.jobs);
356        let reports = scanner
357            .run(&repo, repo.path())
358            .map_err(|e| Status::from_error(Box::new(e)))?;
359
360        let (tx, rx) = mpsc::channel(4);
361
362        tokio::spawn(async move {
363            for report in reports {
364                if tx.send(Ok(report.into())).await.is_err() {
365                    break;
366                }
367            }
368
369            // explicitly own until scanning is finished
370            drop(permit);
371            drop(scanner);
372            drop(repo);
373            drop(config);
374        });
375
376        Ok(Response::new(ReceiverStream::new(rx)))
377    }
378
379    async fn push(
380        &self,
381        request: Request<PushRequest>,
382    ) -> Result<Response<PushResponse>, Status> {
383        // TODO: use try_acquire_owned() with custom timeout
384        // acquire exclusive scanning permission
385        let permit = self.scanning.clone().acquire_owned().await.unwrap();
386
387        let push = request.into_inner();
388        let record = indoc::formatdoc! {"
389            scanning push:
390              old ref: {}
391              new ref: {}
392              ref name: {}
393        ", push.old_ref, push.new_ref, push.ref_name};
394        tracing::info!("{record}");
395
396        let git_repo =
397            git2::Repository::open(&self.path).map_err(|e| Status::from_error(Box::new(e)))?;
398
399        // run targeted pkgcruft scanning
400        let result = self.handle_push(&git_repo, &push);
401
402        // reset HEAD on error or failure
403        if result.is_err() || result.as_ref().map(|r| r.failed).unwrap_or_default() {
404            // reset reference and HEAD
405            let old_oid: git2::Oid = push
406                .old_ref
407                .parse()
408                .map_err(|e| Status::from_error(Box::new(e)))?;
409            git_repo
410                .find_reference(&push.ref_name)
411                .map_err(|e| Status::from_error(Box::new(e)))?
412                .set_target(old_oid, "")
413                .map_err(|e| Status::from_error(Box::new(e)))?;
414            git_repo
415                .set_head(&push.ref_name)
416                .map_err(|e| Status::from_error(Box::new(e)))?;
417
418            // asynchronously revert working tree and index
419            tokio::spawn(async move {
420                git_repo
421                    .checkout_head(Some(git2::build::CheckoutBuilder::default().force()))
422                    .ok();
423
424                // explicitly own until repo mangling is finished
425                drop(permit);
426                drop(git_repo);
427            });
428        }
429
430        match result {
431            Ok(reply) => Ok(Response::new(reply)),
432            Err(e) => Err(Status::from_error(Box::new(e))),
433        }
434    }
435}