apt_cmd/
apt_get.rs

1// Copyright 2021-2022 System76 <info@system76.com>
2// SPDX-License-Identifier: MPL-2.0
3
4use crate::request::{Request, RequestError};
5use crate::AptUpgradeEvent;
6use as_result::*;
7use async_stream::stream;
8use futures::prelude::*;
9use std::process::ExitStatus;
10use std::{collections::HashSet, io, pin::Pin};
11use tokio::io::{AsyncBufReadExt, BufReader};
12use tokio::process::{Child, ChildStdout, Command};
13
14#[derive(Debug)]
15pub enum UpdateEvent {
16    BadPPA(BadPPA),
17    ExitStatus(io::Result<ExitStatus>),
18}
19
20#[derive(Debug)]
21pub struct BadPPA {
22    pub url: String,
23    pub pocket: String,
24}
25
26pub type UpgradeEvents = Pin<Box<dyn Stream<Item = AptUpgradeEvent> + Send>>;
27
28#[derive(AsMut, Deref, DerefMut)]
29#[as_mut(forward)]
30pub struct AptGet(Command);
31
32impl AptGet {
33    #[allow(clippy::new_without_default)]
34    pub fn new() -> Self {
35        let mut cmd = Command::new("apt-get");
36        cmd.env("LANG", "C");
37        Self(cmd)
38    }
39
40    pub fn allow_downgrades(mut self) -> Self {
41        self.arg("--allow-downgrades");
42        self
43    }
44
45    pub fn autoremove(mut self) -> Self {
46        self.arg("autoremove");
47        self
48    }
49
50    pub fn dpkg_option(mut self, option: &str) -> Self {
51        self.args(["-o", &["Dpkg::Options::=", option].concat()]);
52        self
53    }
54
55    pub fn fix_broken(mut self) -> Self {
56        self.args(["install", "-f"]);
57        self
58    }
59
60    pub fn force(mut self) -> Self {
61        self.arg("-y");
62        self
63    }
64
65    pub fn force_breaks(self) -> Self {
66        self.dpkg_option("--force-breaks")
67    }
68
69    pub fn force_confdef(self) -> Self {
70        self.dpkg_option("--force-confdef")
71    }
72
73    pub fn force_conflicts(self) -> Self {
74        self.dpkg_option("--force-conflicts")
75    }
76
77    pub fn force_confold(self) -> Self {
78        self.dpkg_option("--force-confold")
79    }
80
81    pub fn force_depends(self) -> Self {
82        self.dpkg_option("--force-depends")
83    }
84
85    pub fn force_depends_version(self) -> Self {
86        self.dpkg_option("--force-depends-version")
87    }
88
89    pub fn force_overwrite(self) -> Self {
90        self.dpkg_option("--force-overwrite")
91    }
92
93    pub async fn install<I, S>(mut self, packages: I) -> io::Result<()>
94    where
95        I: IntoIterator<Item = S>,
96        S: AsRef<std::ffi::OsStr>,
97    {
98        self.arg("install");
99        self.args(packages);
100
101        self.status().await
102    }
103
104    pub fn mark_auto(mut self) -> Self {
105        self.arg("--mark-auto");
106        self
107    }
108
109    pub fn noninteractive(mut self) -> Self {
110        self.env("DEBIAN_FRONTEND", "noninteractive");
111        self
112    }
113
114    pub async fn update(mut self) -> io::Result<()> {
115        self.arg("update");
116        self.status().await
117    }
118
119    pub fn simulate(mut self) -> Self {
120        self.arg("-s");
121        self
122    }
123
124    pub async fn upgrade(mut self) -> io::Result<()> {
125        self.arg("full-upgrade");
126        self.status().await
127    }
128
129    pub async fn stream_upgrade(mut self) -> io::Result<(Child, UpgradeEvents)> {
130        self.args(["--show-progress", "full-upgrade"]);
131
132        let (child, stdout) = self.spawn_with_stdout().await?;
133
134        let stream = stream! {
135            let mut stdout = BufReader::new(stdout).lines();
136
137            while let Ok(Some(line)) = stdout.next_line().await {
138                if let Ok(event) = line.parse::<AptUpgradeEvent>() {
139                    yield event;
140                }
141            }
142        };
143
144        Ok((child, Box::pin(stream)))
145    }
146
147    pub async fn remove<I, S>(mut self, packages: I) -> io::Result<()>
148    where
149        I: IntoIterator<Item = S>,
150        S: AsRef<std::ffi::OsStr>,
151    {
152        self.arg("remove");
153        self.args(packages);
154
155        self.status().await
156    }
157
158    pub async fn fetch_uris(
159        mut self,
160        command: &[&str],
161    ) -> io::Result<Result<HashSet<Request>, RequestError>> {
162        self.arg("--print-uris");
163        self.args(command);
164
165        let (mut child, stdout) = self.spawn_with_stdout().await?;
166
167        let mut stdout = BufReader::new(stdout).lines();
168
169        let mut packages = HashSet::new();
170
171        while let Ok(Some(line)) = stdout.next_line().await {
172            if !line.starts_with('\'') {
173                continue;
174            }
175
176            let package = match line.parse::<Request>() {
177                Ok(package) => package,
178                Err(why) => return Ok(Err(why)),
179            };
180
181            packages.insert(package);
182        }
183
184        child.wait().await.map_result()?;
185
186        Ok(Ok(packages))
187    }
188
189    pub async fn stream_update(mut self) -> io::Result<Pin<Box<dyn Stream<Item = UpdateEvent> + Send>>> {
190        self.arg("update");
191
192        let (mut child, stdout) = self.spawn_with_stdout().await?;
193
194        let mut stdout = BufReader::new(stdout).lines();
195
196        let stream = stream! {
197            while let Ok(Some(line)) = stdout.next_line().await {
198                if line.starts_with("Err") {
199                    let mut fields = line.split_ascii_whitespace();
200                    let _ = fields.next();
201                    let url = fields.next().unwrap();
202                    let pocket = fields.next().unwrap();
203
204                    yield UpdateEvent::BadPPA(BadPPA {
205                        url: url.into(),
206                        pocket: pocket.into(),
207                    });
208                }
209            }
210
211            yield UpdateEvent::ExitStatus(child.wait().await);
212        };
213
214        Ok(Box::pin(stream))
215    }
216
217    pub async fn spawn_with_stdout(self) -> io::Result<(Child, ChildStdout)> {
218        crate::utils::spawn_with_stdout(self.0).await
219    }
220
221    pub async fn status(mut self) -> io::Result<()> {
222        self.0.status().await?.into_result()
223    }
224}