iroh_bytes/get/
progress.rs

1//! Types for get progress state management.
2
3use std::{collections::HashMap, num::NonZeroU64};
4
5use serde::{Deserialize, Serialize};
6use tracing::warn;
7
8use crate::{protocol::RangeSpec, store::BaoBlobSize, Hash};
9
10use super::db::{BlobId, DownloadProgress};
11
12/// The identifier for progress events.
13pub type ProgressId = u64;
14
15/// Accumulated progress state of a transfer.
16#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
17pub struct TransferState {
18    /// The root blob of this transfer (may be a hash seq),
19    pub root: BlobState,
20    /// Whether we are connected to a node
21    pub connected: bool,
22    /// Children if the root blob is a hash seq, empty for raw blobs
23    pub children: HashMap<NonZeroU64, BlobState>,
24    /// Child being transferred at the moment.
25    pub current: Option<BlobId>,
26    /// Progress ids for individual blobs.
27    pub progress_id_to_blob: HashMap<ProgressId, BlobId>,
28}
29
30impl TransferState {
31    /// Create a new, empty transfer state.
32    pub fn new(root_hash: Hash) -> Self {
33        Self {
34            root: BlobState::new(root_hash),
35            connected: false,
36            children: Default::default(),
37            current: None,
38            progress_id_to_blob: Default::default(),
39        }
40    }
41}
42
43/// State of a single blob in transfer
44#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
45pub struct BlobState {
46    /// The hash of this blob.
47    pub hash: Hash,
48    /// The size of this blob. Only known if the blob is partially present locally, or after having
49    /// received the size from the remote.
50    pub size: Option<BaoBlobSize>,
51    /// The current state of the blob transfer.
52    pub progress: BlobProgress,
53    /// Ranges already available locally at the time of starting the transfer.
54    pub local_ranges: Option<RangeSpec>,
55    /// Number of children (only applies to hashseqs, None for raw blobs).
56    pub child_count: Option<u64>,
57}
58
59/// Progress state for a single blob
60#[derive(Debug, Default, Clone, Serialize, Deserialize, Eq, PartialEq)]
61pub enum BlobProgress {
62    /// Download is pending
63    #[default]
64    Pending,
65    /// Download is in progress
66    Progressing(u64),
67    /// Download has finished
68    Done,
69}
70
71impl BlobState {
72    /// Create a new [`BlobState`].
73    pub fn new(hash: Hash) -> Self {
74        Self {
75            hash,
76            size: None,
77            local_ranges: None,
78            child_count: None,
79            progress: BlobProgress::default(),
80        }
81    }
82}
83
84impl TransferState {
85    /// Get state of the root blob of this transfer.
86    pub fn root(&self) -> &BlobState {
87        &self.root
88    }
89
90    /// Get a blob state by its [`BlobId`] in this transfer.
91    pub fn get_blob(&self, blob_id: &BlobId) -> Option<&BlobState> {
92        match blob_id {
93            BlobId::Root => Some(&self.root),
94            BlobId::Child(id) => self.children.get(id),
95        }
96    }
97
98    /// Get the blob state currently being transferred.
99    pub fn get_current(&self) -> Option<&BlobState> {
100        self.current.as_ref().and_then(|id| self.get_blob(id))
101    }
102
103    fn get_or_insert_blob(&mut self, blob_id: BlobId, hash: Hash) -> &mut BlobState {
104        match blob_id {
105            BlobId::Root => &mut self.root,
106            BlobId::Child(id) => self
107                .children
108                .entry(id)
109                .or_insert_with(|| BlobState::new(hash)),
110        }
111    }
112    fn get_blob_mut(&mut self, blob_id: &BlobId) -> Option<&mut BlobState> {
113        match blob_id {
114            BlobId::Root => Some(&mut self.root),
115            BlobId::Child(id) => self.children.get_mut(id),
116        }
117    }
118
119    fn get_by_progress_id(&mut self, progress_id: ProgressId) -> Option<&mut BlobState> {
120        let blob_id = *self.progress_id_to_blob.get(&progress_id)?;
121        self.get_blob_mut(&blob_id)
122    }
123
124    /// Update the state with a new [`DownloadProgress`] event for this transfer.
125    pub fn on_progress(&mut self, event: DownloadProgress) {
126        match event {
127            DownloadProgress::InitialState(s) => {
128                *self = s;
129            }
130            DownloadProgress::FoundLocal {
131                child,
132                hash,
133                size,
134                valid_ranges,
135            } => {
136                let blob = self.get_or_insert_blob(child, hash);
137                blob.size = Some(size);
138                blob.local_ranges = Some(valid_ranges);
139            }
140            DownloadProgress::Connected => self.connected = true,
141            DownloadProgress::Found {
142                id: progress_id,
143                child: blob_id,
144                hash,
145                size,
146            } => {
147                let blob = self.get_or_insert_blob(blob_id, hash);
148                blob.size = match blob.size {
149                    // If we don't have a verified size for this blob yet: Use the size as reported
150                    // by the remote.
151                    None | Some(BaoBlobSize::Unverified(_)) => Some(BaoBlobSize::Unverified(size)),
152                    // Otherwise, keep the existing verified size.
153                    value @ Some(BaoBlobSize::Verified(_)) => value,
154                };
155                blob.progress = BlobProgress::Progressing(0);
156                self.progress_id_to_blob.insert(progress_id, blob_id);
157                self.current = Some(blob_id);
158            }
159            DownloadProgress::FoundHashSeq { hash, children } => {
160                if hash == self.root.hash {
161                    self.root.child_count = Some(children);
162                } else {
163                    // I think it is an invariant of the protocol that `FoundHashSeq` is only
164                    // triggered for the root hash.
165                    warn!("Received `FoundHashSeq` event for a hash which is not the download's root hash.")
166                }
167            }
168            DownloadProgress::Progress { id, offset } => {
169                if let Some(blob) = self.get_by_progress_id(id) {
170                    blob.progress = BlobProgress::Progressing(offset);
171                } else {
172                    warn!(%id, "Received `Progress` event for unknown progress id.")
173                }
174            }
175            DownloadProgress::Done { id } => {
176                if let Some(blob) = self.get_by_progress_id(id) {
177                    blob.progress = BlobProgress::Done;
178                    self.progress_id_to_blob.remove(&id);
179                } else {
180                    warn!(%id, "Received `Done` event for unknown progress id.")
181                }
182            }
183            _ => {}
184        }
185    }
186}