kratactl/
pull.rs

1use std::{
2    collections::{hash_map::Entry, HashMap},
3    time::Duration,
4};
5
6use anyhow::{anyhow, Result};
7use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
8use krata::v1::control::{
9    image_progress_indication::Indication, ImageProgressIndication, ImageProgressLayerPhase,
10    ImageProgressPhase, PullImageReply,
11};
12use tokio_stream::StreamExt;
13use tonic::Streaming;
14
15const SPINNER_STRINGS: &[&str] = &[
16    "[=                   ]",
17    "[ =                  ]",
18    "[  =                 ]",
19    "[   =                ]",
20    "[    =               ]",
21    "[     =              ]",
22    "[      =             ]",
23    "[       =            ]",
24    "[        =           ]",
25    "[         =          ]",
26    "[          =         ]",
27    "[           =        ]",
28    "[            =       ]",
29    "[             =      ]",
30    "[              =     ]",
31    "[               =    ]",
32    "[                =   ]",
33    "[                 =  ]",
34    "[                  = ]",
35    "[                   =]",
36    "[====================]",
37];
38
39fn progress_bar_for_indication(indication: &ImageProgressIndication) -> Option<ProgressBar> {
40    match indication.indication.as_ref() {
41        Some(Indication::Hidden(_)) | None => None,
42        Some(Indication::Bar(indic)) => {
43            let bar = ProgressBar::new(indic.total);
44            bar.enable_steady_tick(Duration::from_millis(100));
45            Some(bar)
46        }
47        Some(Indication::Spinner(_)) => {
48            let bar = ProgressBar::new_spinner();
49            bar.enable_steady_tick(Duration::from_millis(100));
50            Some(bar)
51        }
52        Some(Indication::Completed(indic)) => {
53            let bar = ProgressBar::new_spinner();
54            bar.enable_steady_tick(Duration::from_millis(100));
55            if !indic.message.is_empty() {
56                bar.finish_with_message(indic.message.clone());
57            } else {
58                bar.finish()
59            }
60            Some(bar)
61        }
62    }
63}
64
65fn configure_for_indication(
66    bar: &mut ProgressBar,
67    multi_progress: &mut MultiProgress,
68    indication: &ImageProgressIndication,
69    top_phase: Option<ImageProgressPhase>,
70    layer_phase: Option<ImageProgressLayerPhase>,
71    layer_id: Option<&str>,
72) {
73    let prefix = if let Some(phase) = top_phase {
74        match phase {
75            ImageProgressPhase::Unknown => "unknown",
76            ImageProgressPhase::Started => "started",
77            ImageProgressPhase::Resolving => "resolving",
78            ImageProgressPhase::Resolved => "resolved",
79            ImageProgressPhase::ConfigDownload => "downloading",
80            ImageProgressPhase::LayerDownload => "downloading",
81            ImageProgressPhase::Assemble => "assembling",
82            ImageProgressPhase::Pack => "packing",
83            ImageProgressPhase::Complete => "complete",
84        }
85    } else if let Some(phase) = layer_phase {
86        match phase {
87            ImageProgressLayerPhase::Unknown => "unknown",
88            ImageProgressLayerPhase::Waiting => "waiting",
89            ImageProgressLayerPhase::Downloading => "downloading",
90            ImageProgressLayerPhase::Downloaded => "downloaded",
91            ImageProgressLayerPhase::Extracting => "extracting",
92            ImageProgressLayerPhase::Extracted => "extracted",
93        }
94    } else {
95        ""
96    };
97    let prefix = prefix.to_string();
98
99    let id = if let Some(layer_id) = layer_id {
100        let hash = if let Some((_, hash)) = layer_id.split_once(':') {
101            hash
102        } else {
103            "unknown"
104        };
105        let small_hash = if hash.len() > 10 { &hash[0..10] } else { hash };
106        Some(format!("{:width$}", small_hash, width = 10))
107    } else {
108        None
109    };
110
111    let prefix = if let Some(id) = id {
112        format!("{} {:width$}", id, prefix, width = 11)
113    } else {
114        format!("           {:width$}", prefix, width = 11)
115    };
116
117    match indication.indication.as_ref() {
118        Some(Indication::Hidden(_)) | None => {
119            multi_progress.remove(bar);
120            return;
121        }
122        Some(Indication::Bar(indic)) => {
123            if indic.is_bytes {
124                bar.set_style(ProgressStyle::with_template("{prefix} [{bar:20}] {msg} {binary_bytes}/{binary_total_bytes} ({binary_bytes_per_sec}) eta: {eta}").unwrap().progress_chars("=>-"));
125            } else {
126                bar.set_style(
127                    ProgressStyle::with_template(
128                        "{prefix} [{bar:20} {msg} {human_pos}/{human_len} ({per_sec}/sec)",
129                    )
130                    .unwrap()
131                    .progress_chars("=>-"),
132                );
133            }
134            bar.set_message(indic.message.clone());
135            bar.set_position(indic.current);
136            bar.set_length(indic.total);
137        }
138        Some(Indication::Spinner(indic)) => {
139            bar.set_style(
140                ProgressStyle::with_template("{prefix} {spinner}  {msg}")
141                    .unwrap()
142                    .tick_strings(SPINNER_STRINGS),
143            );
144            bar.set_message(indic.message.clone());
145        }
146        Some(Indication::Completed(indic)) => {
147            if bar.is_finished() {
148                return;
149            }
150            bar.disable_steady_tick();
151            bar.set_message(indic.message.clone());
152            if indic.total != 0 {
153                bar.set_position(indic.total);
154                bar.set_length(indic.total);
155            }
156            if bar.style().get_tick_str(0).contains('=') {
157                bar.set_style(
158                    ProgressStyle::with_template("{prefix} {spinner}  {msg}")
159                        .unwrap()
160                        .tick_strings(SPINNER_STRINGS),
161                );
162                bar.finish_with_message(indic.message.clone());
163            } else if indic.is_bytes {
164                bar.set_style(
165                    ProgressStyle::with_template("{prefix} [{bar:20}] {msg} {binary_total_bytes}")
166                        .unwrap()
167                        .progress_chars("=>-"),
168                );
169            } else {
170                bar.set_style(
171                    ProgressStyle::with_template("{prefix} [{bar:20}] {msg}")
172                        .unwrap()
173                        .progress_chars("=>-"),
174                );
175            }
176            bar.tick();
177            bar.enable_steady_tick(Duration::from_millis(100));
178        }
179    };
180
181    bar.set_prefix(prefix);
182    bar.tick();
183}
184
185pub async fn pull_interactive_progress(
186    mut stream: Streaming<PullImageReply>,
187) -> Result<PullImageReply> {
188    let mut multi_progress = MultiProgress::new();
189    multi_progress.set_move_cursor(false);
190    let mut progresses = HashMap::new();
191
192    while let Some(reply) = stream.next().await {
193        let reply = match reply {
194            Ok(reply) => reply,
195            Err(error) => {
196                multi_progress.clear()?;
197                return Err(error.into());
198            }
199        };
200
201        if reply.progress.is_none() && !reply.digest.is_empty() {
202            multi_progress.clear()?;
203            return Ok(reply);
204        }
205
206        let Some(oci) = reply.progress else {
207            continue;
208        };
209
210        for layer in &oci.layers {
211            let Some(ref indication) = layer.indication else {
212                continue;
213            };
214
215            let bar = match progresses.entry(layer.id.clone()) {
216                Entry::Occupied(entry) => Some(entry.into_mut()),
217
218                Entry::Vacant(entry) => {
219                    if let Some(bar) = progress_bar_for_indication(indication) {
220                        multi_progress.add(bar.clone());
221                        Some(entry.insert(bar))
222                    } else {
223                        None
224                    }
225                }
226            };
227
228            if let Some(bar) = bar {
229                configure_for_indication(
230                    bar,
231                    &mut multi_progress,
232                    indication,
233                    None,
234                    Some(layer.phase()),
235                    Some(&layer.id),
236                );
237            }
238        }
239
240        if let Some(ref indication) = oci.indication {
241            let bar = match progresses.entry("root".to_string()) {
242                Entry::Occupied(entry) => Some(entry.into_mut()),
243
244                Entry::Vacant(entry) => {
245                    if let Some(bar) = progress_bar_for_indication(indication) {
246                        multi_progress.add(bar.clone());
247                        Some(entry.insert(bar))
248                    } else {
249                        None
250                    }
251                }
252            };
253
254            if let Some(bar) = bar {
255                configure_for_indication(
256                    bar,
257                    &mut multi_progress,
258                    indication,
259                    Some(oci.phase()),
260                    None,
261                    None,
262                );
263            }
264        }
265    }
266    multi_progress.clear()?;
267    Err(anyhow!("never received final reply for image pull"))
268}