1use std::io::Write;
2use std::net::SocketAddr;
3use std::sync::Arc;
4
5use camino::{Utf8Path, Utf8PathBuf};
6use indexmap::IndexSet;
7use itertools::Itertools;
8use pkgcraft::config::Config as PkgcraftConfig;
9use pkgcraft::restrict::Restrict;
10use pkgcruft::report::ReportLevel;
11use pkgcruft::scan::Scanner;
12use tempfile::{TempDir, tempdir};
13use tokio::net::{TcpListener, UnixListener};
14use tokio::sync::{Semaphore, mpsc};
15use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream, UnixListenerStream};
16use tonic::transport::Server;
17use tonic::{Request, Response, Status};
18
19use crate::proto::pkgcruft_server::{Pkgcruft, PkgcruftServer};
20use crate::proto::{EmptyRequest, PushRequest, PushResponse, StringResponse};
21use crate::uds::verify_socket_path;
22use crate::{Error, git};
23
24enum Listener {
25 Tcp(TcpListener),
26 Unix(UnixListener),
27}
28
29impl Listener {
30 async fn try_new<S: AsRef<str>>(socket: S) -> crate::Result<Self> {
32 let socket = socket.as_ref();
33 let (socket, listener) = match socket.parse::<SocketAddr>() {
34 Err(_) if socket.starts_with('/') => {
35 verify_socket_path(socket)?;
36 let listener = UnixListener::bind(socket).map_err(|e| {
37 Error::Start(format!("failed binding to socket: {socket}: {e}"))
38 })?;
39 (socket.to_string(), Listener::Unix(listener))
40 }
41 Err(_) => return Err(Error::InvalidValue(format!("invalid socket: {socket}"))),
42 Ok(socket) => {
43 let listener = TcpListener::bind(&socket).await.map_err(|e| {
44 Error::Start(format!("failed binding to socket: {socket}: {e}"))
45 })?;
46 let addr = listener.local_addr().map_err(|e| {
47 Error::Start(format!("invalid local address: {socket}: {e}"))
48 })?;
49 (addr.to_string(), Listener::Tcp(listener))
50 }
51 };
52
53 tracing::info!("service listening at: {socket}");
54 Ok(listener)
55 }
56}
57
58pub struct PkgcruftServiceBuilder {
59 uri: String,
60 socket: Option<String>,
61 jobs: usize,
62 temp: bool,
63}
64
65impl PkgcruftServiceBuilder {
66 pub fn new(uri: &str) -> Self {
68 Self {
69 uri: uri.to_string(),
70 socket: None,
71 jobs: num_cpus::get(),
72 temp: false,
73 }
74 }
75
76 pub fn socket<S: Into<String>>(mut self, socket: S) -> Self {
78 self.socket = Some(socket.into());
79 self
80 }
81
82 pub fn jobs(mut self, value: usize) -> Self {
84 self.jobs = value;
85 self
86 }
87
88 pub fn temp(mut self, value: bool) -> Self {
90 self.temp = value;
91 self
92 }
93
94 pub async fn start(self) -> crate::Result<()> {
96 let socket = if let Some(value) = self.socket {
98 value
99 } else {
100 let config = PkgcraftConfig::new("pkgcraft", "");
102 config.path().run.join("pkgcruft.sock").to_string()
103 };
104
105 let service = PkgcruftService::try_new(self.uri, self.temp, self.jobs)?;
106 let server = Server::builder().add_service(PkgcruftServer::new(service));
107
108 let listener = Listener::try_new(socket).await?;
109 match listener {
110 Listener::Unix(listener) => {
111 server
112 .serve_with_incoming(UnixListenerStream::new(listener))
113 .await
114 }
115 Listener::Tcp(listener) => {
116 server
117 .serve_with_incoming(TcpListenerStream::new(listener))
118 .await
119 }
120 }
121 .map_err(|e| Error::Service(e.to_string()))
122 }
123}
124
125struct PkgcruftService {
126 _tempdir: Option<TempDir>,
127 path: Utf8PathBuf,
128 scanning: Arc<Semaphore>,
129 jobs: usize,
130}
131
132impl PkgcruftService {
133 fn try_new(uri: String, temp: bool, jobs: usize) -> crate::Result<Self> {
135 let mut _tempdir = None;
136 let path = if temp {
137 let tempdir = tempdir()
139 .map_err(|e| Error::Start(format!("failed creating temp dir: {e}")))?;
140 let path = Utf8PathBuf::from_path_buf(tempdir.path().to_owned())
141 .map_err(|p| Error::Start(format!("invalid tempdir path: {p:?}")))?;
142 _tempdir = Some(tempdir);
143
144 git::clone(&uri, &path)
146 .map_err(|e| Error::Start(format!("failed cloning git repo: {uri}: {e}")))?;
147
148 path
149 } else {
150 uri.into()
151 };
152
153 let mut config = PkgcraftConfig::new("pkgcraft", "");
155 let repo = config
156 .add_repo_path("repo", &path, 0)
157 .map_err(|e| Error::Start(format!("invalid repo: {e}")))?;
158 let repo = repo
159 .into_ebuild()
160 .map_err(|e| Error::Start(format!("invalid ebuild repo: {path}: {e}")))?;
161 config
162 .finalize()
163 .map_err(|e| Error::Start(format!("failed finalizing config: {e}")))?;
164
165 repo.metadata()
167 .cache()
168 .regen(&repo)
169 .progress(true)
170 .run()
171 .ok();
172
173 Ok(Self {
176 _tempdir,
177 path,
178 scanning: Arc::new(Semaphore::new(1)),
179 jobs,
180 })
181 }
182
183 fn handle_push(
185 &self,
186 git_repo: &git2::Repository,
187 push: &PushRequest,
188 ) -> crate::Result<PushResponse> {
189 let odb = git_repo.odb()?;
191 let mut pack_writer = odb.packwriter()?;
192 pack_writer
193 .write_all(&push.pack)
194 .map_err(|e| Error::IO(format!("failed writing pack file: {e}")))?;
195 pack_writer
196 .flush()
197 .map_err(|e| Error::IO(format!("failed flushing pack file: {e}")))?;
198 pack_writer.commit()?;
199
200 let ref_name = &push.ref_name;
202 let old_oid: git2::Oid = push.old_ref.parse()?;
203 let new_oid: git2::Oid = push.new_ref.parse()?;
204 let commit = git_repo.find_annotated_commit(new_oid)?;
205
206 let (analysis, _prefs) = git_repo.merge_analysis(&[&commit])?;
208 if analysis.is_unborn() {
209 let msg = format!("unborn: setting {ref_name}: {new_oid}");
210 git_repo.reference("HEAD", new_oid, false, &msg)?;
211 } else if analysis.is_fast_forward() {
212 let head = git_repo.head()?;
214 let head_oid = head.peel_to_commit()?.id();
215 if head_oid != old_oid {
216 return Err(Error::InvalidValue(format!("invalid git repo HEAD: {head_oid}")));
217 }
218
219 let msg = format!("fast-forward: setting {ref_name}: {new_oid}");
221 git_repo
222 .find_reference(ref_name)?
223 .set_target(new_oid, &msg)?;
224 } else {
225 return Err(Error::InvalidValue(format!("non-fast-forward merge: {analysis:?}")));
226 }
227
228 git_repo.set_head(ref_name)?;
230 git_repo.checkout_head(Some(git2::build::CheckoutBuilder::default().force()))?;
231
232 let diff = git::diff(git_repo, &push.old_ref, &push.new_ref)?;
234
235 let mut config = PkgcraftConfig::new("pkgcraft", "");
237 let repo = config.add_repo_path("repo", &self.path, 0)?;
238 let repo = repo
239 .into_ebuild()
240 .map_err(|e| Error::InvalidValue(format!("invalid ebuild repo: {e}")))?;
241 config.finalize()?;
242
243 let mut cpns = IndexSet::new();
245 let mut eclass = false;
246 for delta in diff.deltas() {
247 if let Some(path) = delta.new_file().path().and_then(Utf8Path::from_path) {
248 if let Ok(cpn) = repo.cpn_from_path(path) {
249 cpns.insert(cpn);
250 } else if path.as_str().starts_with("eclass/") {
251 eclass = true;
252 }
253 }
254 }
255
256 let mut reports = IndexSet::new();
257
258 let mut scanner = Scanner::new()
260 .jobs(self.jobs)
261 .exit([ReportLevel::Critical, ReportLevel::Error]);
262 for cpn in cpns {
263 let reports_iter = scanner.run(&repo, &cpn)?;
264 reports.extend(reports_iter.into_iter().map(|r| r.to_json()));
265 }
266
267 if eclass {
269 scanner = scanner.reports([pkgcruft::check::Check::Metadata]);
270 let reports_iter = scanner.run(&repo, Restrict::True)?;
271 reports.extend(reports_iter.into_iter().map(|r| r.to_json()));
272 }
273
274 Ok(PushResponse {
275 reports: reports.into_iter().sorted().collect(),
276 failed: scanner.failed(),
277 })
278 }
279}
280
281#[tonic::async_trait]
282impl Pkgcruft for PkgcruftService {
283 async fn version(
284 &self,
285 _request: Request<EmptyRequest>,
286 ) -> Result<Response<StringResponse>, Status> {
287 let data = env!("CARGO_PKG_VERSION").to_string();
288 let reply = StringResponse { data };
289 Ok(Response::new(reply))
290 }
291
292 type ScanStream = ReceiverStream<Result<StringResponse, Status>>;
293
294 async fn scan(
295 &self,
296 _request: Request<EmptyRequest>,
297 ) -> Result<Response<Self::ScanStream>, Status> {
298 let permit = self.scanning.clone().acquire_owned().await.unwrap();
301
302 let mut config = PkgcraftConfig::new("pkgcraft", "");
304 let repo = config
305 .add_repo_path("repo", &self.path, 0)
306 .map_err(|e| Status::from_error(Box::new(e)))?;
307 let repo = repo
308 .into_ebuild()
309 .map_err(|e| Status::invalid_argument(format!("invalid ebuild repo: {e}")))?;
310 config
311 .finalize()
312 .map_err(|e| Status::from_error(Box::new(e)))?;
313
314 let scanner = Scanner::new().jobs(self.jobs);
316 let reports = scanner
317 .run(&repo, repo.path())
318 .map_err(|e| Status::from_error(Box::new(e)))?;
319
320 let (tx, rx) = mpsc::channel(4);
321
322 tokio::spawn(async move {
323 for report in reports {
324 if tx.send(Ok(report.into())).await.is_err() {
325 break;
326 }
327 }
328
329 drop(permit);
331 drop(scanner);
332 drop(repo);
333 drop(config);
334 });
335
336 Ok(Response::new(ReceiverStream::new(rx)))
337 }
338
339 async fn push(
340 &self,
341 request: Request<PushRequest>,
342 ) -> Result<Response<PushResponse>, Status> {
343 let permit = self.scanning.clone().acquire_owned().await.unwrap();
346
347 let push = request.into_inner();
348 let record = indoc::formatdoc! {"
349 scanning push:
350 old ref: {}
351 new ref: {}
352 ref name: {}
353 ", push.old_ref, push.new_ref, push.ref_name};
354 tracing::info!("{record}");
355
356 let git_repo =
357 git2::Repository::open(&self.path).map_err(|e| Status::from_error(Box::new(e)))?;
358
359 let result = self.handle_push(&git_repo, &push);
361
362 if result.is_err() || result.as_ref().map(|r| r.failed).unwrap_or_default() {
364 let old_oid: git2::Oid = push
366 .old_ref
367 .parse()
368 .map_err(|e| Status::from_error(Box::new(e)))?;
369 git_repo
370 .find_reference(&push.ref_name)
371 .map_err(|e| Status::from_error(Box::new(e)))?
372 .set_target(old_oid, "")
373 .map_err(|e| Status::from_error(Box::new(e)))?;
374 git_repo
375 .set_head(&push.ref_name)
376 .map_err(|e| Status::from_error(Box::new(e)))?;
377
378 tokio::spawn(async move {
380 git_repo
381 .checkout_head(Some(git2::build::CheckoutBuilder::default().force()))
382 .ok();
383
384 drop(permit);
386 drop(git_repo);
387 });
388 }
389
390 match result {
391 Ok(reply) => Ok(Response::new(reply)),
392 Err(e) => Err(Status::from_error(Box::new(e))),
393 }
394 }
395}