1use std::{
2 collections::{hash_map::Entry, HashMap},
3 time::Duration,
4};
5
6use anyhow::{anyhow, Result};
7use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
8use krata::v1::control::{
9 image_progress_indication::Indication, ImageProgressIndication, ImageProgressLayerPhase,
10 ImageProgressPhase, PullImageReply,
11};
12use tokio_stream::StreamExt;
13use tonic::Streaming;
14
15const SPINNER_STRINGS: &[&str] = &[
16 "[= ]",
17 "[ = ]",
18 "[ = ]",
19 "[ = ]",
20 "[ = ]",
21 "[ = ]",
22 "[ = ]",
23 "[ = ]",
24 "[ = ]",
25 "[ = ]",
26 "[ = ]",
27 "[ = ]",
28 "[ = ]",
29 "[ = ]",
30 "[ = ]",
31 "[ = ]",
32 "[ = ]",
33 "[ = ]",
34 "[ = ]",
35 "[ =]",
36 "[====================]",
37];
38
39fn progress_bar_for_indication(indication: &ImageProgressIndication) -> Option<ProgressBar> {
40 match indication.indication.as_ref() {
41 Some(Indication::Hidden(_)) | None => None,
42 Some(Indication::Bar(indic)) => {
43 let bar = ProgressBar::new(indic.total);
44 bar.enable_steady_tick(Duration::from_millis(100));
45 Some(bar)
46 }
47 Some(Indication::Spinner(_)) => {
48 let bar = ProgressBar::new_spinner();
49 bar.enable_steady_tick(Duration::from_millis(100));
50 Some(bar)
51 }
52 Some(Indication::Completed(indic)) => {
53 let bar = ProgressBar::new_spinner();
54 bar.enable_steady_tick(Duration::from_millis(100));
55 if !indic.message.is_empty() {
56 bar.finish_with_message(indic.message.clone());
57 } else {
58 bar.finish()
59 }
60 Some(bar)
61 }
62 }
63}
64
65fn configure_for_indication(
66 bar: &mut ProgressBar,
67 multi_progress: &mut MultiProgress,
68 indication: &ImageProgressIndication,
69 top_phase: Option<ImageProgressPhase>,
70 layer_phase: Option<ImageProgressLayerPhase>,
71 layer_id: Option<&str>,
72) {
73 let prefix = if let Some(phase) = top_phase {
74 match phase {
75 ImageProgressPhase::Unknown => "unknown",
76 ImageProgressPhase::Started => "started",
77 ImageProgressPhase::Resolving => "resolving",
78 ImageProgressPhase::Resolved => "resolved",
79 ImageProgressPhase::ConfigDownload => "downloading",
80 ImageProgressPhase::LayerDownload => "downloading",
81 ImageProgressPhase::Assemble => "assembling",
82 ImageProgressPhase::Pack => "packing",
83 ImageProgressPhase::Complete => "complete",
84 }
85 } else if let Some(phase) = layer_phase {
86 match phase {
87 ImageProgressLayerPhase::Unknown => "unknown",
88 ImageProgressLayerPhase::Waiting => "waiting",
89 ImageProgressLayerPhase::Downloading => "downloading",
90 ImageProgressLayerPhase::Downloaded => "downloaded",
91 ImageProgressLayerPhase::Extracting => "extracting",
92 ImageProgressLayerPhase::Extracted => "extracted",
93 }
94 } else {
95 ""
96 };
97 let prefix = prefix.to_string();
98
99 let id = if let Some(layer_id) = layer_id {
100 let hash = if let Some((_, hash)) = layer_id.split_once(':') {
101 hash
102 } else {
103 "unknown"
104 };
105 let small_hash = if hash.len() > 10 { &hash[0..10] } else { hash };
106 Some(format!("{:width$}", small_hash, width = 10))
107 } else {
108 None
109 };
110
111 let prefix = if let Some(id) = id {
112 format!("{} {:width$}", id, prefix, width = 11)
113 } else {
114 format!(" {:width$}", prefix, width = 11)
115 };
116
117 match indication.indication.as_ref() {
118 Some(Indication::Hidden(_)) | None => {
119 multi_progress.remove(bar);
120 return;
121 }
122 Some(Indication::Bar(indic)) => {
123 if indic.is_bytes {
124 bar.set_style(ProgressStyle::with_template("{prefix} [{bar:20}] {msg} {binary_bytes}/{binary_total_bytes} ({binary_bytes_per_sec}) eta: {eta}").unwrap().progress_chars("=>-"));
125 } else {
126 bar.set_style(
127 ProgressStyle::with_template(
128 "{prefix} [{bar:20} {msg} {human_pos}/{human_len} ({per_sec}/sec)",
129 )
130 .unwrap()
131 .progress_chars("=>-"),
132 );
133 }
134 bar.set_message(indic.message.clone());
135 bar.set_position(indic.current);
136 bar.set_length(indic.total);
137 }
138 Some(Indication::Spinner(indic)) => {
139 bar.set_style(
140 ProgressStyle::with_template("{prefix} {spinner} {msg}")
141 .unwrap()
142 .tick_strings(SPINNER_STRINGS),
143 );
144 bar.set_message(indic.message.clone());
145 }
146 Some(Indication::Completed(indic)) => {
147 if bar.is_finished() {
148 return;
149 }
150 bar.disable_steady_tick();
151 bar.set_message(indic.message.clone());
152 if indic.total != 0 {
153 bar.set_position(indic.total);
154 bar.set_length(indic.total);
155 }
156 if bar.style().get_tick_str(0).contains('=') {
157 bar.set_style(
158 ProgressStyle::with_template("{prefix} {spinner} {msg}")
159 .unwrap()
160 .tick_strings(SPINNER_STRINGS),
161 );
162 bar.finish_with_message(indic.message.clone());
163 } else if indic.is_bytes {
164 bar.set_style(
165 ProgressStyle::with_template("{prefix} [{bar:20}] {msg} {binary_total_bytes}")
166 .unwrap()
167 .progress_chars("=>-"),
168 );
169 } else {
170 bar.set_style(
171 ProgressStyle::with_template("{prefix} [{bar:20}] {msg}")
172 .unwrap()
173 .progress_chars("=>-"),
174 );
175 }
176 bar.tick();
177 bar.enable_steady_tick(Duration::from_millis(100));
178 }
179 };
180
181 bar.set_prefix(prefix);
182 bar.tick();
183}
184
185pub async fn pull_interactive_progress(
186 mut stream: Streaming<PullImageReply>,
187) -> Result<PullImageReply> {
188 let mut multi_progress = MultiProgress::new();
189 multi_progress.set_move_cursor(false);
190 let mut progresses = HashMap::new();
191
192 while let Some(reply) = stream.next().await {
193 let reply = match reply {
194 Ok(reply) => reply,
195 Err(error) => {
196 multi_progress.clear()?;
197 return Err(error.into());
198 }
199 };
200
201 if reply.progress.is_none() && !reply.digest.is_empty() {
202 multi_progress.clear()?;
203 return Ok(reply);
204 }
205
206 let Some(oci) = reply.progress else {
207 continue;
208 };
209
210 for layer in &oci.layers {
211 let Some(ref indication) = layer.indication else {
212 continue;
213 };
214
215 let bar = match progresses.entry(layer.id.clone()) {
216 Entry::Occupied(entry) => Some(entry.into_mut()),
217
218 Entry::Vacant(entry) => {
219 if let Some(bar) = progress_bar_for_indication(indication) {
220 multi_progress.add(bar.clone());
221 Some(entry.insert(bar))
222 } else {
223 None
224 }
225 }
226 };
227
228 if let Some(bar) = bar {
229 configure_for_indication(
230 bar,
231 &mut multi_progress,
232 indication,
233 None,
234 Some(layer.phase()),
235 Some(&layer.id),
236 );
237 }
238 }
239
240 if let Some(ref indication) = oci.indication {
241 let bar = match progresses.entry("root".to_string()) {
242 Entry::Occupied(entry) => Some(entry.into_mut()),
243
244 Entry::Vacant(entry) => {
245 if let Some(bar) = progress_bar_for_indication(indication) {
246 multi_progress.add(bar.clone());
247 Some(entry.insert(bar))
248 } else {
249 None
250 }
251 }
252 };
253
254 if let Some(bar) = bar {
255 configure_for_indication(
256 bar,
257 &mut multi_progress,
258 indication,
259 Some(oci.phase()),
260 None,
261 None,
262 );
263 }
264 }
265 }
266 multi_progress.clear()?;
267 Err(anyhow!("never received final reply for image pull"))
268}