1use crate::request::{Request, RequestError};
5use crate::AptUpgradeEvent;
6use as_result::*;
7use async_stream::stream;
8use futures::prelude::*;
9use std::process::ExitStatus;
10use std::{collections::HashSet, io, pin::Pin};
11use tokio::io::{AsyncBufReadExt, BufReader};
12use tokio::process::{Child, ChildStdout, Command};
13
14#[derive(Debug)]
15pub enum UpdateEvent {
16 BadPPA(BadPPA),
17 ExitStatus(io::Result<ExitStatus>),
18}
19
20#[derive(Debug)]
21pub struct BadPPA {
22 pub url: String,
23 pub pocket: String,
24}
25
26pub type UpgradeEvents = Pin<Box<dyn Stream<Item = AptUpgradeEvent> + Send>>;
27
28#[derive(AsMut, Deref, DerefMut)]
29#[as_mut(forward)]
30pub struct AptGet(Command);
31
32impl AptGet {
33 #[allow(clippy::new_without_default)]
34 pub fn new() -> Self {
35 let mut cmd = Command::new("apt-get");
36 cmd.env("LANG", "C");
37 Self(cmd)
38 }
39
40 pub fn allow_downgrades(mut self) -> Self {
41 self.arg("--allow-downgrades");
42 self
43 }
44
45 pub fn autoremove(mut self) -> Self {
46 self.arg("autoremove");
47 self
48 }
49
50 pub fn dpkg_option(mut self, option: &str) -> Self {
51 self.args(["-o", &["Dpkg::Options::=", option].concat()]);
52 self
53 }
54
55 pub fn fix_broken(mut self) -> Self {
56 self.args(["install", "-f"]);
57 self
58 }
59
60 pub fn force(mut self) -> Self {
61 self.arg("-y");
62 self
63 }
64
65 pub fn force_breaks(self) -> Self {
66 self.dpkg_option("--force-breaks")
67 }
68
69 pub fn force_confdef(self) -> Self {
70 self.dpkg_option("--force-confdef")
71 }
72
73 pub fn force_conflicts(self) -> Self {
74 self.dpkg_option("--force-conflicts")
75 }
76
77 pub fn force_confold(self) -> Self {
78 self.dpkg_option("--force-confold")
79 }
80
81 pub fn force_depends(self) -> Self {
82 self.dpkg_option("--force-depends")
83 }
84
85 pub fn force_depends_version(self) -> Self {
86 self.dpkg_option("--force-depends-version")
87 }
88
89 pub fn force_overwrite(self) -> Self {
90 self.dpkg_option("--force-overwrite")
91 }
92
93 pub async fn install<I, S>(mut self, packages: I) -> io::Result<()>
94 where
95 I: IntoIterator<Item = S>,
96 S: AsRef<std::ffi::OsStr>,
97 {
98 self.arg("install");
99 self.args(packages);
100
101 self.status().await
102 }
103
104 pub fn mark_auto(mut self) -> Self {
105 self.arg("--mark-auto");
106 self
107 }
108
109 pub fn noninteractive(mut self) -> Self {
110 self.env("DEBIAN_FRONTEND", "noninteractive");
111 self
112 }
113
114 pub async fn update(mut self) -> io::Result<()> {
115 self.arg("update");
116 self.status().await
117 }
118
119 pub fn simulate(mut self) -> Self {
120 self.arg("-s");
121 self
122 }
123
124 pub async fn upgrade(mut self) -> io::Result<()> {
125 self.arg("full-upgrade");
126 self.status().await
127 }
128
129 pub async fn stream_upgrade(mut self) -> io::Result<(Child, UpgradeEvents)> {
130 self.args(["--show-progress", "full-upgrade"]);
131
132 let (child, stdout) = self.spawn_with_stdout().await?;
133
134 let stream = stream! {
135 let mut stdout = BufReader::new(stdout).lines();
136
137 while let Ok(Some(line)) = stdout.next_line().await {
138 if let Ok(event) = line.parse::<AptUpgradeEvent>() {
139 yield event;
140 }
141 }
142 };
143
144 Ok((child, Box::pin(stream)))
145 }
146
147 pub async fn remove<I, S>(mut self, packages: I) -> io::Result<()>
148 where
149 I: IntoIterator<Item = S>,
150 S: AsRef<std::ffi::OsStr>,
151 {
152 self.arg("remove");
153 self.args(packages);
154
155 self.status().await
156 }
157
158 pub async fn fetch_uris(
159 mut self,
160 command: &[&str],
161 ) -> io::Result<Result<HashSet<Request>, RequestError>> {
162 self.arg("--print-uris");
163 self.args(command);
164
165 let (mut child, stdout) = self.spawn_with_stdout().await?;
166
167 let mut stdout = BufReader::new(stdout).lines();
168
169 let mut packages = HashSet::new();
170
171 while let Ok(Some(line)) = stdout.next_line().await {
172 if !line.starts_with('\'') {
173 continue;
174 }
175
176 let package = match line.parse::<Request>() {
177 Ok(package) => package,
178 Err(why) => return Ok(Err(why)),
179 };
180
181 packages.insert(package);
182 }
183
184 child.wait().await.map_result()?;
185
186 Ok(Ok(packages))
187 }
188
189 pub async fn stream_update(mut self) -> io::Result<Pin<Box<dyn Stream<Item = UpdateEvent> + Send>>> {
190 self.arg("update");
191
192 let (mut child, stdout) = self.spawn_with_stdout().await?;
193
194 let mut stdout = BufReader::new(stdout).lines();
195
196 let stream = stream! {
197 while let Ok(Some(line)) = stdout.next_line().await {
198 if line.starts_with("Err") {
199 let mut fields = line.split_ascii_whitespace();
200 let _ = fields.next();
201 let url = fields.next().unwrap();
202 let pocket = fields.next().unwrap();
203
204 yield UpdateEvent::BadPPA(BadPPA {
205 url: url.into(),
206 pocket: pocket.into(),
207 });
208 }
209 }
210
211 yield UpdateEvent::ExitStatus(child.wait().await);
212 };
213
214 Ok(Box::pin(stream))
215 }
216
217 pub async fn spawn_with_stdout(self) -> io::Result<(Child, ChildStdout)> {
218 crate::utils::spawn_with_stdout(self.0).await
219 }
220
221 pub async fn status(mut self) -> io::Result<()> {
222 self.0.status().await?.into_result()
223 }
224}