pkgcruft_git/
service.rs

1use std::io::Write;
2use std::net::SocketAddr;
3use std::sync::Arc;
4
5use camino::{Utf8Path, Utf8PathBuf};
6use indexmap::IndexSet;
7use itertools::Itertools;
8use pkgcraft::config::Config as PkgcraftConfig;
9use pkgcraft::restrict::Restrict;
10use pkgcruft::report::ReportLevel;
11use pkgcruft::scan::Scanner;
12use tempfile::{TempDir, tempdir};
13use tokio::net::{TcpListener, UnixListener};
14use tokio::sync::{Semaphore, mpsc};
15use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream, UnixListenerStream};
16use tonic::transport::Server;
17use tonic::{Request, Response, Status};
18
19use crate::proto::pkgcruft_server::{Pkgcruft, PkgcruftServer};
20use crate::proto::{EmptyRequest, PushRequest, PushResponse, StringResponse};
21use crate::uds::verify_socket_path;
22use crate::{Error, git};
23
24enum Listener {
25    Tcp(TcpListener),
26    Unix(UnixListener),
27}
28
29impl Listener {
30    /// Try creating a new listener for the pkgcruft service.
31    async fn try_new<S: AsRef<str>>(socket: S) -> crate::Result<Self> {
32        let socket = socket.as_ref();
33        let (socket, listener) = match socket.parse::<SocketAddr>() {
34            Err(_) if socket.starts_with('/') => {
35                verify_socket_path(socket)?;
36                let listener = UnixListener::bind(socket).map_err(|e| {
37                    Error::Start(format!("failed binding to socket: {socket}: {e}"))
38                })?;
39                (socket.to_string(), Listener::Unix(listener))
40            }
41            Err(_) => return Err(Error::InvalidValue(format!("invalid socket: {socket}"))),
42            Ok(socket) => {
43                let listener = TcpListener::bind(&socket).await.map_err(|e| {
44                    Error::Start(format!("failed binding to socket: {socket}: {e}"))
45                })?;
46                let addr = listener.local_addr().map_err(|e| {
47                    Error::Start(format!("invalid local address: {socket}: {e}"))
48                })?;
49                (addr.to_string(), Listener::Tcp(listener))
50            }
51        };
52
53        tracing::info!("service listening at: {socket}");
54        Ok(listener)
55    }
56}
57
58pub struct PkgcruftServiceBuilder {
59    uri: String,
60    socket: Option<String>,
61    jobs: usize,
62    temp: bool,
63}
64
65impl PkgcruftServiceBuilder {
66    /// Create a new service builder.
67    pub fn new(uri: &str) -> Self {
68        Self {
69            uri: uri.to_string(),
70            socket: None,
71            jobs: num_cpus::get(),
72            temp: false,
73        }
74    }
75
76    /// Set the network socket to bind.
77    pub fn socket<S: Into<String>>(mut self, socket: S) -> Self {
78        self.socket = Some(socket.into());
79        self
80    }
81
82    /// Set the number of jobs to run.
83    pub fn jobs(mut self, value: usize) -> Self {
84        self.jobs = value;
85        self
86    }
87
88    /// Use a temporary directory for the git repo.
89    pub fn temp(mut self, value: bool) -> Self {
90        self.temp = value;
91        self
92    }
93
94    /// Start the service, waiting for it to finish.
95    pub async fn start(self) -> crate::Result<()> {
96        // determine network socket
97        let socket = if let Some(value) = self.socket {
98            value
99        } else {
100            // default to using UNIX domain socket for the executing user
101            let config = PkgcraftConfig::new("pkgcraft", "");
102            config.path().run.join("pkgcruft.sock").to_string()
103        };
104
105        let service = PkgcruftService::try_new(self.uri, self.temp, self.jobs)?;
106        let server = Server::builder().add_service(PkgcruftServer::new(service));
107
108        let listener = Listener::try_new(socket).await?;
109        match listener {
110            Listener::Unix(listener) => {
111                server
112                    .serve_with_incoming(UnixListenerStream::new(listener))
113                    .await
114            }
115            Listener::Tcp(listener) => {
116                server
117                    .serve_with_incoming(TcpListenerStream::new(listener))
118                    .await
119            }
120        }
121        .map_err(|e| Error::Service(e.to_string()))
122    }
123}
124
125struct PkgcruftService {
126    _tempdir: Option<TempDir>,
127    path: Utf8PathBuf,
128    scanning: Arc<Semaphore>,
129    jobs: usize,
130}
131
132impl PkgcruftService {
133    /// Try creating a new service.
134    fn try_new(uri: String, temp: bool, jobs: usize) -> crate::Result<Self> {
135        let mut _tempdir = None;
136        let path = if temp {
137            // create temporary git repo dir
138            let tempdir = tempdir()
139                .map_err(|e| Error::Start(format!("failed creating temp dir: {e}")))?;
140            let path = Utf8PathBuf::from_path_buf(tempdir.path().to_owned())
141                .map_err(|p| Error::Start(format!("invalid tempdir path: {p:?}")))?;
142            _tempdir = Some(tempdir);
143
144            // clone git repo into temporary dir
145            git::clone(&uri, &path)
146                .map_err(|e| Error::Start(format!("failed cloning git repo: {uri}: {e}")))?;
147
148            path
149        } else {
150            uri.into()
151        };
152
153        // verify target path is a valid ebuild repo
154        let mut config = PkgcraftConfig::new("pkgcraft", "");
155        let repo = config
156            .add_repo_path("repo", &path, 0)
157            .map_err(|e| Error::Start(format!("invalid repo: {e}")))?;
158        let repo = repo
159            .into_ebuild()
160            .map_err(|e| Error::Start(format!("invalid ebuild repo: {path}: {e}")))?;
161        config
162            .finalize()
163            .map_err(|e| Error::Start(format!("failed finalizing config: {e}")))?;
164
165        // generate ebuild repo metadata ignoring failures
166        repo.metadata()
167            .cache()
168            .regen(&repo)
169            .progress(true)
170            .run()
171            .ok();
172
173        // TODO: generate or verify db of existing pkgcruft reports
174
175        Ok(Self {
176            _tempdir,
177            path,
178            scanning: Arc::new(Semaphore::new(1)),
179            jobs,
180        })
181    }
182
183    /// Perform a scanning run for a push request.
184    fn handle_push(
185        &self,
186        git_repo: &git2::Repository,
187        push: &PushRequest,
188    ) -> crate::Result<PushResponse> {
189        // write pack file to odb
190        let odb = git_repo.odb()?;
191        let mut pack_writer = odb.packwriter()?;
192        pack_writer
193            .write_all(&push.pack)
194            .map_err(|e| Error::IO(format!("failed writing pack file: {e}")))?;
195        pack_writer
196            .flush()
197            .map_err(|e| Error::IO(format!("failed flushing pack file: {e}")))?;
198        pack_writer.commit()?;
199
200        // determine target commit
201        let ref_name = &push.ref_name;
202        let old_oid: git2::Oid = push.old_ref.parse()?;
203        let new_oid: git2::Oid = push.new_ref.parse()?;
204        let commit = git_repo.find_annotated_commit(new_oid)?;
205
206        // update target reference for unborn or fast-forward merge variants
207        let (analysis, _prefs) = git_repo.merge_analysis(&[&commit])?;
208        if analysis.is_unborn() {
209            let msg = format!("unborn: setting {ref_name}: {new_oid}");
210            git_repo.reference("HEAD", new_oid, false, &msg)?;
211        } else if analysis.is_fast_forward() {
212            // verify HEAD points to the expected commit
213            let head = git_repo.head()?;
214            let head_oid = head.peel_to_commit()?.id();
215            if head_oid != old_oid {
216                return Err(Error::InvalidValue(format!("invalid git repo HEAD: {head_oid}")));
217            }
218
219            // update target reference
220            let msg = format!("fast-forward: setting {ref_name}: {new_oid}");
221            git_repo
222                .find_reference(ref_name)?
223                .set_target(new_oid, &msg)?;
224        } else {
225            return Err(Error::InvalidValue(format!("non-fast-forward merge: {analysis:?}")));
226        }
227
228        // update HEAD for target reference
229        git_repo.set_head(ref_name)?;
230        git_repo.checkout_head(Some(git2::build::CheckoutBuilder::default().force()))?;
231
232        // determine diff
233        let diff = git::diff(git_repo, &push.old_ref, &push.new_ref)?;
234
235        // initialize ebuild repo
236        let mut config = PkgcraftConfig::new("pkgcraft", "");
237        let repo = config.add_repo_path("repo", &self.path, 0)?;
238        let repo = repo
239            .into_ebuild()
240            .map_err(|e| Error::InvalidValue(format!("invalid ebuild repo: {e}")))?;
241        config.finalize()?;
242
243        // determine target Cpns from diff
244        let mut cpns = IndexSet::new();
245        let mut eclass = false;
246        for delta in diff.deltas() {
247            if let Some(path) = delta.new_file().path().and_then(Utf8Path::from_path) {
248                if let Ok(cpn) = repo.cpn_from_path(path) {
249                    cpns.insert(cpn);
250                } else if path.as_str().starts_with("eclass/") {
251                    eclass = true;
252                }
253            }
254        }
255
256        let mut reports = IndexSet::new();
257
258        // scan individual packages that were changed
259        let mut scanner = Scanner::new()
260            .jobs(self.jobs)
261            .exit([ReportLevel::Critical, ReportLevel::Error]);
262        for cpn in cpns {
263            let reports_iter = scanner.run(&repo, &cpn)?;
264            reports.extend(reports_iter.into_iter().map(|r| r.to_json()));
265        }
266
267        // scan full tree for metadata errors on eclass changes
268        if eclass {
269            scanner = scanner.reports([pkgcruft::check::Check::Metadata]);
270            let reports_iter = scanner.run(&repo, Restrict::True)?;
271            reports.extend(reports_iter.into_iter().map(|r| r.to_json()));
272        }
273
274        Ok(PushResponse {
275            reports: reports.into_iter().sorted().collect(),
276            failed: scanner.failed(),
277        })
278    }
279}
280
281#[tonic::async_trait]
282impl Pkgcruft for PkgcruftService {
283    async fn version(
284        &self,
285        _request: Request<EmptyRequest>,
286    ) -> Result<Response<StringResponse>, Status> {
287        let data = env!("CARGO_PKG_VERSION").to_string();
288        let reply = StringResponse { data };
289        Ok(Response::new(reply))
290    }
291
292    type ScanStream = ReceiverStream<Result<StringResponse, Status>>;
293
294    async fn scan(
295        &self,
296        _request: Request<EmptyRequest>,
297    ) -> Result<Response<Self::ScanStream>, Status> {
298        // TODO: use try_acquire_owned() with custom timeout
299        // acquire exclusive scanning permission
300        let permit = self.scanning.clone().acquire_owned().await.unwrap();
301
302        // TODO: partially reload repo or reset lazy metadata fields
303        let mut config = PkgcraftConfig::new("pkgcraft", "");
304        let repo = config
305            .add_repo_path("repo", &self.path, 0)
306            .map_err(|e| Status::from_error(Box::new(e)))?;
307        let repo = repo
308            .into_ebuild()
309            .map_err(|e| Status::invalid_argument(format!("invalid ebuild repo: {e}")))?;
310        config
311            .finalize()
312            .map_err(|e| Status::from_error(Box::new(e)))?;
313
314        // TODO: process request data into a restrict target
315        let scanner = Scanner::new().jobs(self.jobs);
316        let reports = scanner
317            .run(&repo, repo.path())
318            .map_err(|e| Status::from_error(Box::new(e)))?;
319
320        let (tx, rx) = mpsc::channel(4);
321
322        tokio::spawn(async move {
323            for report in reports {
324                if tx.send(Ok(report.into())).await.is_err() {
325                    break;
326                }
327            }
328
329            // explicitly own until scanning is finished
330            drop(permit);
331            drop(scanner);
332            drop(repo);
333            drop(config);
334        });
335
336        Ok(Response::new(ReceiverStream::new(rx)))
337    }
338
339    async fn push(
340        &self,
341        request: Request<PushRequest>,
342    ) -> Result<Response<PushResponse>, Status> {
343        // TODO: use try_acquire_owned() with custom timeout
344        // acquire exclusive scanning permission
345        let permit = self.scanning.clone().acquire_owned().await.unwrap();
346
347        let push = request.into_inner();
348        let record = indoc::formatdoc! {"
349            scanning push:
350              old ref: {}
351              new ref: {}
352              ref name: {}
353        ", push.old_ref, push.new_ref, push.ref_name};
354        tracing::info!("{record}");
355
356        let git_repo =
357            git2::Repository::open(&self.path).map_err(|e| Status::from_error(Box::new(e)))?;
358
359        // run targeted pkgcruft scanning
360        let result = self.handle_push(&git_repo, &push);
361
362        // reset HEAD on error or failure
363        if result.is_err() || result.as_ref().map(|r| r.failed).unwrap_or_default() {
364            // reset reference and HEAD
365            let old_oid: git2::Oid = push
366                .old_ref
367                .parse()
368                .map_err(|e| Status::from_error(Box::new(e)))?;
369            git_repo
370                .find_reference(&push.ref_name)
371                .map_err(|e| Status::from_error(Box::new(e)))?
372                .set_target(old_oid, "")
373                .map_err(|e| Status::from_error(Box::new(e)))?;
374            git_repo
375                .set_head(&push.ref_name)
376                .map_err(|e| Status::from_error(Box::new(e)))?;
377
378            // asynchronously revert working tree and index
379            tokio::spawn(async move {
380                git_repo
381                    .checkout_head(Some(git2::build::CheckoutBuilder::default().force()))
382                    .ok();
383
384                // explicitly own until repo mangling is finished
385                drop(permit);
386                drop(git_repo);
387            });
388        }
389
390        match result {
391            Ok(reply) => Ok(Response::new(reply)),
392            Err(e) => Err(Status::from_error(Box::new(e))),
393        }
394    }
395}