iroh_blobs/get/
progress.rs

1//! Types for get progress state management.
2
3use std::{collections::HashMap, num::NonZeroU64};
4
5use serde::{Deserialize, Serialize};
6use tracing::warn;
7
8use super::db::{BlobId, DownloadProgress};
9use crate::{protocol::RangeSpec, store::BaoBlobSize, Hash};
10
11/// The identifier for progress events.
12pub type ProgressId = u64;
13
14/// Accumulated progress state of a transfer.
15#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
16pub struct TransferState {
17    /// The root blob of this transfer (may be a hash seq),
18    pub root: BlobState,
19    /// Whether we are connected to a node
20    pub connected: bool,
21    /// Children if the root blob is a hash seq, empty for raw blobs
22    pub children: HashMap<NonZeroU64, BlobState>,
23    /// Child being transferred at the moment.
24    pub current: Option<BlobId>,
25    /// Progress ids for individual blobs.
26    pub progress_id_to_blob: HashMap<ProgressId, BlobId>,
27}
28
29impl TransferState {
30    /// Create a new, empty transfer state.
31    pub fn new(root_hash: Hash) -> Self {
32        Self {
33            root: BlobState::new(root_hash),
34            connected: false,
35            children: Default::default(),
36            current: None,
37            progress_id_to_blob: Default::default(),
38        }
39    }
40}
41
42/// State of a single blob in transfer
43#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
44pub struct BlobState {
45    /// The hash of this blob.
46    pub hash: Hash,
47    /// The size of this blob. Only known if the blob is partially present locally, or after having
48    /// received the size from the remote.
49    pub size: Option<BaoBlobSize>,
50    /// The current state of the blob transfer.
51    pub progress: BlobProgress,
52    /// Ranges already available locally at the time of starting the transfer.
53    pub local_ranges: Option<RangeSpec>,
54    /// Number of children (only applies to hashseqs, None for raw blobs).
55    pub child_count: Option<u64>,
56}
57
58/// Progress state for a single blob
59#[derive(Debug, Default, Clone, Serialize, Deserialize, Eq, PartialEq)]
60pub enum BlobProgress {
61    /// Download is pending
62    #[default]
63    Pending,
64    /// Download is in progress
65    Progressing(u64),
66    /// Download has finished
67    Done,
68}
69
70impl BlobState {
71    /// Create a new [`BlobState`].
72    pub fn new(hash: Hash) -> Self {
73        Self {
74            hash,
75            size: None,
76            local_ranges: None,
77            child_count: None,
78            progress: BlobProgress::default(),
79        }
80    }
81}
82
83impl TransferState {
84    /// Get state of the root blob of this transfer.
85    pub fn root(&self) -> &BlobState {
86        &self.root
87    }
88
89    /// Get a blob state by its [`BlobId`] in this transfer.
90    pub fn get_blob(&self, blob_id: &BlobId) -> Option<&BlobState> {
91        match blob_id {
92            BlobId::Root => Some(&self.root),
93            BlobId::Child(id) => self.children.get(id),
94        }
95    }
96
97    /// Get the blob state currently being transferred.
98    pub fn get_current(&self) -> Option<&BlobState> {
99        self.current.as_ref().and_then(|id| self.get_blob(id))
100    }
101
102    fn get_or_insert_blob(&mut self, blob_id: BlobId, hash: Hash) -> &mut BlobState {
103        match blob_id {
104            BlobId::Root => &mut self.root,
105            BlobId::Child(id) => self
106                .children
107                .entry(id)
108                .or_insert_with(|| BlobState::new(hash)),
109        }
110    }
111    fn get_blob_mut(&mut self, blob_id: &BlobId) -> Option<&mut BlobState> {
112        match blob_id {
113            BlobId::Root => Some(&mut self.root),
114            BlobId::Child(id) => self.children.get_mut(id),
115        }
116    }
117
118    fn get_by_progress_id(&mut self, progress_id: ProgressId) -> Option<&mut BlobState> {
119        let blob_id = *self.progress_id_to_blob.get(&progress_id)?;
120        self.get_blob_mut(&blob_id)
121    }
122
123    /// Update the state with a new [`DownloadProgress`] event for this transfer.
124    pub fn on_progress(&mut self, event: DownloadProgress) {
125        match event {
126            DownloadProgress::InitialState(s) => {
127                *self = s;
128            }
129            DownloadProgress::FoundLocal {
130                child,
131                hash,
132                size,
133                valid_ranges,
134            } => {
135                let blob = self.get_or_insert_blob(child, hash);
136                blob.size = Some(size);
137                blob.local_ranges = Some(valid_ranges);
138            }
139            DownloadProgress::Connected => self.connected = true,
140            DownloadProgress::Found {
141                id: progress_id,
142                child: blob_id,
143                hash,
144                size,
145            } => {
146                let blob = self.get_or_insert_blob(blob_id, hash);
147                blob.size = match blob.size {
148                    // If we don't have a verified size for this blob yet: Use the size as reported
149                    // by the remote.
150                    None | Some(BaoBlobSize::Unverified(_)) => Some(BaoBlobSize::Unverified(size)),
151                    // Otherwise, keep the existing verified size.
152                    value @ Some(BaoBlobSize::Verified(_)) => value,
153                };
154                blob.progress = BlobProgress::Progressing(0);
155                self.progress_id_to_blob.insert(progress_id, blob_id);
156                self.current = Some(blob_id);
157            }
158            DownloadProgress::FoundHashSeq { hash, children } => {
159                if hash == self.root.hash {
160                    self.root.child_count = Some(children);
161                } else {
162                    // I think it is an invariant of the protocol that `FoundHashSeq` is only
163                    // triggered for the root hash.
164                    warn!("Received `FoundHashSeq` event for a hash which is not the download's root hash.")
165                }
166            }
167            DownloadProgress::Progress { id, offset } => {
168                if let Some(blob) = self.get_by_progress_id(id) {
169                    blob.progress = BlobProgress::Progressing(offset);
170                } else {
171                    warn!(%id, "Received `Progress` event for unknown progress id.")
172                }
173            }
174            DownloadProgress::Done { id } => {
175                if let Some(blob) = self.get_by_progress_id(id) {
176                    blob.progress = BlobProgress::Done;
177                    self.progress_id_to_blob.remove(&id);
178                } else {
179                    warn!(%id, "Received `Done` event for unknown progress id.")
180                }
181            }
182            DownloadProgress::AllDone(_) | DownloadProgress::Abort(_) => {}
183        }
184    }
185}