1use std::io::Write;
2use std::net::SocketAddr;
3use std::sync::Arc;
4
5use camino::Utf8PathBuf;
6use futures::FutureExt;
7use indexmap::IndexSet;
8use itertools::Itertools;
9use pkgcraft::config::Config as PkgcraftConfig;
10use pkgcraft::restrict::Restrict;
11use pkgcruft::report::ReportLevel;
12use pkgcruft::scan::Scanner;
13use tempfile::{TempDir, tempdir};
14use tokio::net::{TcpListener, UnixListener};
15use tokio::sync::{Semaphore, mpsc, oneshot};
16use tokio::task::JoinHandle;
17use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream, UnixListenerStream};
18use tonic::transport::Server;
19use tonic::{Request, Response, Status};
20
21use crate::proto::pkgcruft_server::{Pkgcruft, PkgcruftServer};
22use crate::proto::{EmptyRequest, PushRequest, PushResponse, StringResponse};
23use crate::uds::verify_socket_path;
24use crate::{Error, git};
25
26enum Listener {
27 Tcp(TcpListener),
28 Unix(UnixListener),
29}
30
31impl Listener {
32 async fn try_new<S: AsRef<str>>(socket: S) -> crate::Result<(String, Self)> {
34 let socket = socket.as_ref();
35 let (socket, listener) = match socket.parse::<SocketAddr>() {
36 Err(_) if socket.starts_with('/') => {
37 verify_socket_path(socket)?;
38 let listener = UnixListener::bind(socket).map_err(|e| {
39 Error::Start(format!("failed binding to socket: {socket}: {e}"))
40 })?;
41 (socket.to_string(), Listener::Unix(listener))
42 }
43 Err(_) => return Err(Error::InvalidValue(format!("invalid socket: {socket}"))),
44 Ok(socket) => {
45 let listener = TcpListener::bind(&socket).await.map_err(|e| {
46 Error::Start(format!("failed binding to socket: {socket}: {e}"))
47 })?;
48 let addr = listener.local_addr().map_err(|e| {
49 Error::Start(format!("invalid local address: {socket}: {e}"))
50 })?;
51 (addr.to_string(), Listener::Tcp(listener))
52 }
53 };
54
55 Ok((socket, listener))
56 }
57}
58
59pub struct PkgcruftServiceBuilder {
60 uri: String,
61 socket: Option<String>,
62 jobs: usize,
63 temp: bool,
64}
65
66impl PkgcruftServiceBuilder {
67 pub fn new<S: ToString>(uri: S) -> Self {
69 Self {
70 uri: uri.to_string(),
71 socket: None,
72 jobs: num_cpus::get(),
73 temp: false,
74 }
75 }
76
77 pub fn socket<S: Into<String>>(mut self, socket: S) -> Self {
79 self.socket = Some(socket.into());
80 self
81 }
82
83 pub fn jobs(mut self, value: usize) -> Self {
85 self.jobs = value;
86 self
87 }
88
89 pub fn temp(mut self, value: bool) -> Self {
91 self.temp = value;
92 self
93 }
94
95 async fn create_listener(&self) -> crate::Result<(String, Listener)> {
97 let socket = if let Some(value) = &self.socket {
99 value.to_string()
100 } else {
101 let config = PkgcraftConfig::new("pkgcraft", "");
103 config.path().run.join("pkgcruft-gitd.sock").to_string()
104 };
105 Listener::try_new(&socket).await
106 }
107
108 async fn listen(self, listener: Listener) -> crate::Result<Pkgcruftd> {
110 let service = PkgcruftService::try_new(&self.uri, self.temp, self.jobs)?;
111 let server = Server::builder().add_service(PkgcruftServer::new(service));
112
113 let (tx, rx) = oneshot::channel::<()>();
114 match listener {
115 Listener::Unix(listener) => {
116 server
117 .serve_with_incoming_shutdown(
118 UnixListenerStream::new(listener),
119 rx.map(drop),
120 )
121 .await
122 }
123 Listener::Tcp(listener) => {
124 server
125 .serve_with_incoming_shutdown(
126 TcpListenerStream::new(listener),
127 rx.map(drop),
128 )
129 .await
130 }
131 }
132 .map_err(|e| Error::Service(e.to_string()))?;
133
134 Ok(Pkgcruftd { _tx: tx })
135 }
136
137 pub async fn start(self) -> crate::Result<Pkgcruftd> {
139 let (socket, listener) = self.create_listener().await?;
140 tracing::info!("service listening at: {socket}");
141 self.listen(listener).await
142 }
143
144 pub async fn spawn(self) -> crate::Result<PkgcruftdTask> {
146 let (socket, listener) = self.create_listener().await?;
147 let _service = tokio::spawn(async move { self.listen(listener).await });
148 Ok(PkgcruftdTask { socket, _service })
149 }
150}
151
152#[derive(Debug)]
154pub struct PkgcruftdTask {
155 pub socket: String,
156 _service: JoinHandle<crate::Result<Pkgcruftd>>,
157}
158
159#[derive(Debug)]
161pub struct Pkgcruftd {
162 _tx: oneshot::Sender<()>,
163}
164
165struct PkgcruftService {
166 _tempdir: Option<TempDir>,
167 path: Utf8PathBuf,
168 scanning: Arc<Semaphore>,
169 jobs: usize,
170}
171
172impl PkgcruftService {
173 fn try_new(uri: &str, temp: bool, jobs: usize) -> crate::Result<Self> {
175 let mut _tempdir = None;
176 let path = if temp {
177 let tempdir = tempdir()
179 .map_err(|e| Error::Start(format!("failed creating temp dir: {e}")))?;
180 let path = Utf8PathBuf::from_path_buf(tempdir.path().to_owned())
181 .map_err(|p| Error::Start(format!("invalid tempdir path: {p:?}")))?;
182 _tempdir = Some(tempdir);
183
184 git::clone(uri, &path)
186 .map_err(|e| Error::Start(format!("failed cloning git repo: {uri}: {e}")))?;
187
188 path
189 } else {
190 uri.into()
191 };
192
193 let mut config = PkgcraftConfig::new("pkgcraft", "");
195 let repo = config
196 .add_repo_path("repo", &path, 0)
197 .map_err(|e| Error::Start(format!("invalid repo: {e}")))?;
198 let repo = repo
199 .into_ebuild()
200 .map_err(|e| Error::Start(format!("invalid ebuild repo: {path}: {e}")))?;
201 config
202 .finalize()
203 .map_err(|e| Error::Start(format!("failed finalizing config: {e}")))?;
204
205 repo.metadata()
207 .cache()
208 .regen(&repo)
209 .progress(true)
210 .run()
211 .ok();
212
213 Ok(Self {
216 _tempdir,
217 path,
218 scanning: Arc::new(Semaphore::new(1)),
219 jobs,
220 })
221 }
222
223 fn handle_push(
225 &self,
226 git_repo: &git2::Repository,
227 push: &PushRequest,
228 ) -> crate::Result<PushResponse> {
229 let odb = git_repo.odb()?;
231 let mut pack_writer = odb.packwriter()?;
232 pack_writer
233 .write_all(&push.pack)
234 .map_err(|e| Error::IO(format!("failed writing pack file: {e}")))?;
235 pack_writer
236 .flush()
237 .map_err(|e| Error::IO(format!("failed flushing pack file: {e}")))?;
238 pack_writer.commit()?;
239
240 let ref_name = &push.ref_name;
242 let old_oid: git2::Oid = push.old_ref.parse()?;
243 let new_oid: git2::Oid = push.new_ref.parse()?;
244 let commit = git_repo.find_annotated_commit(new_oid)?;
245
246 let (analysis, _prefs) = git_repo.merge_analysis(&[&commit])?;
248 if analysis.is_unborn() {
249 let msg = format!("unborn: setting {ref_name}: {new_oid}");
250 git_repo.reference("HEAD", new_oid, false, &msg)?;
251 } else if analysis.is_fast_forward() {
252 let head = git_repo.head()?;
254 let head_oid = head.peel_to_commit()?.id();
255 if head_oid != old_oid {
256 return Err(Error::InvalidValue(format!("invalid git repo HEAD: {head_oid}")));
257 }
258
259 let msg = format!("fast-forward: setting {ref_name}: {new_oid}");
261 git_repo
262 .find_reference(ref_name)?
263 .set_target(new_oid, &msg)?;
264 } else {
265 return Err(Error::InvalidValue(format!("non-fast-forward merge: {analysis:?}")));
266 }
267
268 git_repo.set_head(ref_name)?;
270 git_repo.checkout_head(Some(git2::build::CheckoutBuilder::default().force()))?;
271
272 let diff = git::diff(git_repo, &push.old_ref, &push.new_ref)?;
274
275 let mut config = PkgcraftConfig::new("pkgcraft", "");
277 let repo = config.add_repo_path("repo", &self.path, 0)?;
278 let repo = repo
279 .into_ebuild()
280 .map_err(|e| Error::InvalidValue(format!("invalid ebuild repo: {e}")))?;
281 config.finalize()?;
282
283 let mut cpns = IndexSet::new();
285 let mut eclass = false;
286 for delta in diff.deltas() {
287 if let Some(path) = delta.new_file().path() {
288 if let Ok(cpn) = repo.cpn_from_path(path) {
289 cpns.insert(cpn);
290 } else if path.starts_with("eclass") {
291 eclass = true;
292 }
293 }
294 }
295
296 let mut reports = IndexSet::new();
297
298 let mut scanner = Scanner::new()
300 .jobs(self.jobs)
301 .exit([ReportLevel::Critical, ReportLevel::Error]);
302 for cpn in cpns {
303 let reports_iter = scanner.run(&repo, &cpn)?;
304 reports.extend(reports_iter.into_iter().map(|r| r.to_json()));
305 }
306
307 if eclass {
309 scanner = scanner.reports([pkgcruft::check::CheckKind::Metadata]);
310 let reports_iter = scanner.run(&repo, Restrict::True)?;
311 reports.extend(reports_iter.into_iter().map(|r| r.to_json()));
312 }
313
314 Ok(PushResponse {
315 reports: reports.into_iter().sorted().collect(),
316 failed: scanner.failed(),
317 })
318 }
319}
320
321#[tonic::async_trait]
322impl Pkgcruft for PkgcruftService {
323 async fn version(
324 &self,
325 _request: Request<EmptyRequest>,
326 ) -> Result<Response<StringResponse>, Status> {
327 let data = env!("CARGO_PKG_VERSION").to_string();
328 let reply = StringResponse { data };
329 Ok(Response::new(reply))
330 }
331
332 type ScanStream = ReceiverStream<Result<StringResponse, Status>>;
333
334 async fn scan(
335 &self,
336 _request: Request<EmptyRequest>,
337 ) -> Result<Response<Self::ScanStream>, Status> {
338 let permit = self.scanning.clone().acquire_owned().await.unwrap();
341
342 let mut config = PkgcraftConfig::new("pkgcraft", "");
344 let repo = config
345 .add_repo_path("repo", &self.path, 0)
346 .map_err(|e| Status::from_error(Box::new(e)))?;
347 let repo = repo
348 .into_ebuild()
349 .map_err(|e| Status::invalid_argument(format!("invalid ebuild repo: {e}")))?;
350 config
351 .finalize()
352 .map_err(|e| Status::from_error(Box::new(e)))?;
353
354 let scanner = Scanner::new().jobs(self.jobs);
356 let reports = scanner
357 .run(&repo, repo.path())
358 .map_err(|e| Status::from_error(Box::new(e)))?;
359
360 let (tx, rx) = mpsc::channel(4);
361
362 tokio::spawn(async move {
363 for report in reports {
364 if tx.send(Ok(report.into())).await.is_err() {
365 break;
366 }
367 }
368
369 drop(permit);
371 drop(scanner);
372 drop(repo);
373 drop(config);
374 });
375
376 Ok(Response::new(ReceiverStream::new(rx)))
377 }
378
379 async fn push(
380 &self,
381 request: Request<PushRequest>,
382 ) -> Result<Response<PushResponse>, Status> {
383 let permit = self.scanning.clone().acquire_owned().await.unwrap();
386
387 let push = request.into_inner();
388 let record = indoc::formatdoc! {"
389 scanning push:
390 old ref: {}
391 new ref: {}
392 ref name: {}
393 ", push.old_ref, push.new_ref, push.ref_name};
394 tracing::info!("{record}");
395
396 let git_repo =
397 git2::Repository::open(&self.path).map_err(|e| Status::from_error(Box::new(e)))?;
398
399 let result = self.handle_push(&git_repo, &push);
401
402 if result.is_err() || result.as_ref().map(|r| r.failed).unwrap_or_default() {
404 let old_oid: git2::Oid = push
406 .old_ref
407 .parse()
408 .map_err(|e| Status::from_error(Box::new(e)))?;
409 git_repo
410 .find_reference(&push.ref_name)
411 .map_err(|e| Status::from_error(Box::new(e)))?
412 .set_target(old_oid, "")
413 .map_err(|e| Status::from_error(Box::new(e)))?;
414 git_repo
415 .set_head(&push.ref_name)
416 .map_err(|e| Status::from_error(Box::new(e)))?;
417
418 tokio::spawn(async move {
420 git_repo
421 .checkout_head(Some(git2::build::CheckoutBuilder::default().force()))
422 .ok();
423
424 drop(permit);
426 drop(git_repo);
427 });
428 }
429
430 match result {
431 Ok(reply) => Ok(Response::new(reply)),
432 Err(e) => Err(Status::from_error(Box::new(e))),
433 }
434 }
435}